diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java index 8a5aab376..56940e33c 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java @@ -187,6 +187,7 @@ public class RunManager { "0", // number of initial workers String.valueOf(maximumStartupConcurrency), ResourceUtil.getResourcesStringFromMap(rayConfig.resources), + "", // The internal config list. buildPythonWorkerCommand(), // python worker command buildWorkerCommandRaylet() // java worker command ); diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index f5889d4b7..8f4e391fb 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -193,13 +193,19 @@ def cli(logging_level, logging_format): "--temp-dir", default=None, help="manually specify the root temporary dir of the Ray process") +@click.option( + "--internal-config", + default=None, + type=str, + help="Do NOT use this. This is for debugging/development purposes ONLY.") def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, object_manager_port, node_manager_port, object_store_memory, num_workers, num_cpus, num_gpus, resources, head, no_ui, block, plasma_directory, huge_pages, autoscaling_config, no_redirect_worker_output, no_redirect_output, - plasma_store_socket_name, raylet_socket_name, temp_dir): + plasma_store_socket_name, raylet_socket_name, temp_dir, + internal_config): # Convert hostnames to numerical IP address. if node_ip_address is not None: node_ip_address = services.address_to_ip(node_ip_address) @@ -269,7 +275,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, autoscaling_config=autoscaling_config, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, - temp_dir=temp_dir) + temp_dir=temp_dir, + _internal_config=internal_config) logger.info(address_info) logger.info( "\nStarted Ray on this node. You can add additional nodes to " @@ -348,7 +355,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, huge_pages=huge_pages, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, - temp_dir=temp_dir) + temp_dir=temp_dir, + _internal_config=internal_config) logger.info(address_info) logger.info("\nStarted Ray on this node. If you wish to terminate the " "processes that have been started, run\n\n" diff --git a/python/ray/services.py b/python/ray/services.py index c5bf15d8c..cf0869fd0 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -860,6 +860,7 @@ def start_raylet(redis_address, stdout_file=None, stderr_file=None, cleanup=True, + config=None, redis_password=None): """Start a raylet, which is a combined local scheduler and object manager. @@ -890,11 +891,16 @@ def start_raylet(redis_address, cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the Python process that imported services exits. + config (dict|None): Optional Raylet configuration that will + override defaults in RayConfig. redis_password (str): The password of the redis server. Returns: The raylet socket name. """ + config = config or {} + config_str = ",".join(["{},{}".format(*kv) for kv in config.items()]) + if use_valgrind and use_profiler: raise Exception("Cannot use valgrind and profiler at the same time.") @@ -906,11 +912,8 @@ def start_raylet(redis_address, 1, min(multiprocessing.cpu_count(), static_resources["CPU"])) # Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'. - resource_argument = ",".join([ - "{},{}".format(resource_name, resource_value) - for resource_name, resource_value in zip(static_resources.keys(), - static_resources.values()) - ]) + resource_argument = ",".join( + ["{},{}".format(*kv) for kv in static_resources.items()]) gcs_ip_address, gcs_port = redis_address.split(":") @@ -948,6 +951,7 @@ def start_raylet(redis_address, str(num_workers), str(maximum_startup_concurrency), resource_argument, + config_str, start_worker_command, "", # Worker command for Java, not needed for Python. redis_password or "", @@ -1209,7 +1213,8 @@ def start_raylet_monitor(redis_address, stdout_file=None, stderr_file=None, cleanup=True, - redis_password=None): + redis_password=None, + config=None): """Run a process to monitor the other processes. Args: @@ -1223,10 +1228,14 @@ def start_raylet_monitor(redis_address, Python process that imported services exits. This is True by default. redis_password (str): The password of the redis server. + config (dict|None): Optional configuration that will + override defaults in RayConfig. """ gcs_ip_address, gcs_port = redis_address.split(":") redis_password = redis_password or "" - command = [RAYLET_MONITOR_EXECUTABLE, gcs_ip_address, gcs_port] + config = config or {} + config_str = ",".join(["{},{}".format(*kv) for kv in config.items()]) + command = [RAYLET_MONITOR_EXECUTABLE, gcs_ip_address, gcs_port, config_str] if redis_password: command += [redis_password] p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) @@ -1259,7 +1268,8 @@ def start_ray_processes(address_info=None, autoscaling_config=None, plasma_store_socket_name=None, raylet_socket_name=None, - temp_dir=None): + temp_dir=None, + _internal_config=None): """Helper method to start Ray processes. Args: @@ -1324,6 +1334,8 @@ def start_ray_processes(address_info=None, used by the raylet process. temp_dir (str): If provided, it will specify the root temporary directory for the Ray process. + _internal_config (str): JSON configuration for overriding + RayConfig defaults. For testing purposes ONLY. Returns: A dictionary of the address information for the processes that were @@ -1335,6 +1347,8 @@ def start_ray_processes(address_info=None, logger.info("Process STDOUT and STDERR is being redirected to {}.".format( get_logs_dir_path())) + config = json.loads(_internal_config) if _internal_config else None + if resources is None: resources = {} if not isinstance(resources, list): @@ -1395,7 +1409,8 @@ def start_ray_processes(address_info=None, stdout_file=monitor_stdout_file, stderr_file=monitor_stderr_file, cleanup=cleanup, - redis_password=redis_password) + redis_password=redis_password, + config=config) if redis_shards == []: # Get redis shards from primary redis instance. redis_ip_address, redis_port = redis_address.split(":") @@ -1473,7 +1488,8 @@ def start_ray_processes(address_info=None, stdout_file=raylet_stdout_file, stderr_file=raylet_stderr_file, cleanup=cleanup, - redis_password=redis_password)) + redis_password=redis_password, + config=config)) # Try to start the web UI. if include_webui: @@ -1506,7 +1522,8 @@ def start_ray_node(node_ip_address, huge_pages=False, plasma_store_socket_name=None, raylet_socket_name=None, - temp_dir=None): + temp_dir=None, + _internal_config=None): """Start the Ray processes for a single node. This assumes that the Ray processes on some master node have already been @@ -1550,6 +1567,8 @@ def start_ray_node(node_ip_address, used by the raylet process. temp_dir (str): If provided, it will specify the root temporary directory for the Ray process. + _internal_config (str): JSON configuration for overriding + RayConfig defaults. For testing purposes ONLY. Returns: A dictionary of the address information for the processes that were @@ -1577,7 +1596,8 @@ def start_ray_node(node_ip_address, huge_pages=huge_pages, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, - temp_dir=temp_dir) + temp_dir=temp_dir, + _internal_config=_internal_config) def start_ray_head(address_info=None, @@ -1604,7 +1624,8 @@ def start_ray_head(address_info=None, autoscaling_config=None, plasma_store_socket_name=None, raylet_socket_name=None, - temp_dir=None): + temp_dir=None, + _internal_config=None): """Start Ray in local mode. Args: @@ -1665,6 +1686,8 @@ def start_ray_head(address_info=None, used by the raylet process. temp_dir (str): If provided, it will specify the root temporary directory for the Ray process. + _internal_config (str): JSON configuration for overriding + RayConfig defaults. For testing purposes ONLY. Returns: A dictionary of the address information for the processes that were @@ -1697,4 +1720,5 @@ def start_ray_head(address_info=None, autoscaling_config=autoscaling_config, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, - temp_dir=temp_dir) + temp_dir=temp_dir, + _internal_config=_internal_config) diff --git a/python/ray/test/cluster_utils.py b/python/ray/test/cluster_utils.py index 0d91f499e..1c21ef067 100644 --- a/python/ray/test/cluster_utils.py +++ b/python/ray/test/cluster_utils.py @@ -111,15 +111,17 @@ class Cluster(object): assert not node.any_processes_alive(), ( "There are zombie processes left over after killing.") - def wait_for_nodes(self, retries=20): + def wait_for_nodes(self, retries=30): """Waits for all nodes to be registered with global state. + By default, waits for 3 seconds. + Args: retries (int): Number of times to retry checking client table. """ for i in range(retries): if not ray.is_initialized() or not self._check_registered_nodes(): - time.sleep(0.3) + time.sleep(0.1) else: break diff --git a/python/ray/worker.py b/python/ray/worker.py index 28fa47058..1fd4043c2 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1293,7 +1293,8 @@ def _init(address_info=None, driver_id=None, plasma_store_socket_name=None, raylet_socket_name=None, - temp_dir=None): + temp_dir=None, + _internal_config=None): """Helper method to connect to an existing Ray cluster or start a new one. This method handles two cases. Either a Ray cluster already exists and we @@ -1355,6 +1356,8 @@ def _init(address_info=None, used by the raylet process. temp_dir (str): If provided, it will specify the root temporary directory for the Ray process. + _internal_config (str): JSON configuration for overriding + RayConfig defaults. For testing purposes ONLY. Returns: Address information about the started processes. @@ -1427,7 +1430,8 @@ def _init(address_info=None, include_webui=include_webui, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, - temp_dir=temp_dir) + temp_dir=temp_dir, + _internal_config=_internal_config) else: if redis_address is None: raise Exception("When connecting to an existing cluster, " @@ -1468,6 +1472,9 @@ def _init(address_info=None, if raylet_socket_name is not None: raise Exception("When connecting to an existing cluster, " "raylet_socket_name must not be provided.") + if _internal_config is not None: + raise Exception("When connecting to an existing cluster, " + "_internal_config must not be provided.") # Get the node IP address if one is not provided. if node_ip_address is None: @@ -1530,6 +1537,7 @@ def init(redis_address=None, plasma_store_socket_name=None, raylet_socket_name=None, temp_dir=None, + _internal_config=None, use_raylet=None): """Connect to an existing Ray cluster or start one and connect to it. @@ -1601,6 +1609,8 @@ def init(redis_address=None, used by the raylet process. temp_dir (str): If provided, it will specify the root temporary directory for the Ray process. + _internal_config (str): JSON configuration for overriding + RayConfig defaults. For testing purposes ONLY. Returns: Address information about the started processes. @@ -1658,7 +1668,8 @@ def init(redis_address=None, driver_id=driver_id, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, - temp_dir=temp_dir) + temp_dir=temp_dir, + _internal_config=_internal_config) for hook in _post_init_hooks: hook() return ret diff --git a/src/ray/ray_config.h b/src/ray/ray_config.h index bc1999307..1b6dee349 100644 --- a/src/ray/ray_config.h +++ b/src/ray/ray_config.h @@ -1,7 +1,9 @@ #ifndef RAY_CONFIG_H #define RAY_CONFIG_H -#include +#include + +#include "ray/util/logging.h" class RayConfig { public: @@ -102,6 +104,86 @@ class RayConfig { int num_workers_per_process() const { return num_workers_per_process_; } + void initialize(const std::unordered_map &config_map) { + RAY_CHECK(!initialized_); + for (auto const &pair : config_map) { + // We use a big chain of if else statements because C++ doesn't allow + // switch statements on strings. + if (pair.first == "ray_protocol_version") { + ray_protocol_version_ = pair.second; + } else if (pair.first == "handler_warning_timeout_ms") { + handler_warning_timeout_ms_ = pair.second; + } else if (pair.first == "heartbeat_timeout_milliseconds") { + heartbeat_timeout_milliseconds_ = pair.second; + } else if (pair.first == "num_heartbeats_timeout") { + num_heartbeats_timeout_ = pair.second; + } else if (pair.first == "num_heartbeats_warning") { + num_heartbeats_warning_ = pair.second; + } else if (pair.first == "initial_reconstruction_timeout_milliseconds") { + initial_reconstruction_timeout_milliseconds_ = pair.second; + } else if (pair.first == "get_timeout_milliseconds") { + get_timeout_milliseconds_ = pair.second; + } else if (pair.first == "worker_get_request_size") { + worker_get_request_size_ = pair.second; + } else if (pair.first == "worker_fetch_request_size") { + worker_fetch_request_size_ = pair.second; + } else if (pair.first == "max_lineage_size") { + max_lineage_size_ = pair.second; + } else if (pair.first == "actor_max_dummy_objects") { + actor_max_dummy_objects_ = pair.second; + } else if (pair.first == "num_connect_attempts") { + num_connect_attempts_ = pair.second; + } else if (pair.first == "connect_timeout_milliseconds") { + connect_timeout_milliseconds_ = pair.second; + } else if (pair.first == "local_scheduler_fetch_timeout_milliseconds") { + local_scheduler_fetch_timeout_milliseconds_ = pair.second; + } else if (pair.first == "local_scheduler_reconstruction_timeout_milliseconds") { + local_scheduler_reconstruction_timeout_milliseconds_ = pair.second; + } else if (pair.first == "max_num_to_reconstruct") { + max_num_to_reconstruct_ = pair.second; + } else if (pair.first == "local_scheduler_fetch_request_size") { + local_scheduler_fetch_request_size_ = pair.second; + } else if (pair.first == "kill_worker_timeout_milliseconds") { + kill_worker_timeout_milliseconds_ = pair.second; + } else if (pair.first == "manager_timeout_milliseconds") { + manager_timeout_milliseconds_ = pair.second; + } else if (pair.first == "buf_size") { + buf_size_ = pair.second; + } else if (pair.first == "max_time_for_handler_milliseconds") { + max_time_for_handler_milliseconds_ = pair.second; + } else if (pair.first == "size_limit") { + size_limit_ = pair.second; + } else if (pair.first == "num_elements_limit") { + num_elements_limit_ = pair.second; + } else if (pair.first == "max_time_for_loop") { + max_time_for_loop_ = pair.second; + } else if (pair.first == "redis_db_connect_retries") { + redis_db_connect_retries_ = pair.second; + } else if (pair.first == "redis_db_connect_wait_milliseconds") { + redis_db_connect_wait_milliseconds_ = pair.second; + } else if (pair.first == "plasma_default_release_delay") { + plasma_default_release_delay_ = pair.second; + } else if (pair.first == "L3_cache_size_bytes") { + L3_cache_size_bytes_ = pair.second; + } else if (pair.first == "max_tasks_to_spillback") { + max_tasks_to_spillback_ = pair.second; + } else if (pair.first == "actor_creation_num_spillbacks_warning") { + actor_creation_num_spillbacks_warning_ = pair.second; + } else if (pair.first == "node_manager_forward_task_retry_timeout_milliseconds") { + node_manager_forward_task_retry_timeout_milliseconds_ = pair.second; + } else if (pair.first == "object_manager_pull_timeout_ms") { + object_manager_pull_timeout_ms_ = pair.second; + } else if (pair.first == "object_manager_push_timeout_ms") { + object_manager_push_timeout_ms_ = pair.second; + } else if (pair.first == "object_manager_default_chunk_size") { + object_manager_default_chunk_size_ = pair.second; + } else { + RAY_LOG(FATAL) << "Received unexpected config parameter " << pair.first; + } + } + initialized_ = true; + } + private: RayConfig() : ray_protocol_version_(0x0000000000000000), @@ -138,7 +220,8 @@ class RayConfig { object_manager_pull_timeout_ms_(100), object_manager_push_timeout_ms_(10000), object_manager_default_chunk_size_(1000000), - num_workers_per_process_(1) {} + num_workers_per_process_(1), + initialized_(false) {} ~RayConfig() {} @@ -263,6 +346,10 @@ class RayConfig { /// Number of workers per process int num_workers_per_process_; + + /// Whether the initialization of the instance has been called before. + /// The RayConfig instance can only (and must) be initialized once. + bool initialized_; }; #endif // RAY_CONFIG_H diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 3e2483ee6..93ccc28fd 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -20,7 +20,7 @@ int main(int argc, char *argv[]) { ray::RayLogLevel::INFO, /*log_dir=*/""); ray::RayLog::InstallFailureSignalHandler(); - RAY_CHECK(argc == 13 || argc == 14); + RAY_CHECK(argc == 14 || argc == 15); const std::string raylet_socket_name = std::string(argv[1]); const std::string store_socket_name = std::string(argv[2]); @@ -32,13 +32,29 @@ int main(int argc, char *argv[]) { int num_initial_workers = std::stoi(argv[8]); int maximum_startup_concurrency = std::stoi(argv[9]); const std::string static_resource_list = std::string(argv[10]); - const std::string python_worker_command = std::string(argv[11]); - const std::string java_worker_command = std::string(argv[12]); - const std::string redis_password = (argc == 14 ? std::string(argv[13]) : ""); + const std::string config_list = std::string(argv[11]); + const std::string python_worker_command = std::string(argv[12]); + const std::string java_worker_command = std::string(argv[13]); + const std::string redis_password = (argc == 15 ? std::string(argv[14]) : ""); // Configuration for the node manager. ray::raylet::NodeManagerConfig node_manager_config; std::unordered_map static_resource_conf; + std::unordered_map raylet_config; + + // Parse the configuration list. + std::istringstream config_string(config_list); + std::string config_name; + std::string config_value; + + while (std::getline(config_string, config_name, ',')) { + RAY_CHECK(std::getline(config_string, config_value, ',')); + // TODO(rkn): The line below could throw an exception. What should we do about this? + raylet_config[config_name] = std::stoi(config_value); + } + + RayConfig::instance().initialize(raylet_config); + // Parse the resource list. std::istringstream resource_string(static_resource_list); std::string resource_name; @@ -49,6 +65,7 @@ int main(int argc, char *argv[]) { // TODO(rkn): The line below could throw an exception. What should we do about this? static_resource_conf[resource_name] = std::stod(resource_quantity); } + node_manager_config.resource_config = ray::raylet::ResourceSet(std::move(static_resource_conf)); RAY_LOG(DEBUG) << "Starting raylet with static resource configuration: " diff --git a/src/ray/raylet/monitor_main.cc b/src/ray/raylet/monitor_main.cc index f997566a5..b12c3cfc2 100644 --- a/src/ray/raylet/monitor_main.cc +++ b/src/ray/raylet/monitor_main.cc @@ -1,5 +1,6 @@ #include +#include "ray/ray_config.h" #include "ray/raylet/monitor.h" #include "ray/util/util.h" @@ -8,11 +9,27 @@ int main(int argc, char *argv[]) { ray::RayLog::ShutDownRayLog, argv[0], ray::RayLogLevel::INFO, /*log_dir=*/""); ray::RayLog::InstallFailureSignalHandler(); - RAY_CHECK(argc == 3 || argc == 4); + RAY_CHECK(argc == 4 || argc == 5); const std::string redis_address = std::string(argv[1]); int redis_port = std::stoi(argv[2]); - const std::string redis_password = (argc == 4 ? std::string(argv[3]) : ""); + const std::string config_list = std::string(argv[3]); + const std::string redis_password = (argc == 5 ? std::string(argv[4]) : ""); + + std::unordered_map raylet_config; + + // Parse the configuration list. + std::istringstream config_string(config_list); + std::string config_name; + std::string config_value; + + while (std::getline(config_string, config_name, ',')) { + RAY_CHECK(std::getline(config_string, config_value, ',')); + // TODO(rkn): The line below could throw an exception. What should we do about this? + raylet_config[config_name] = std::stoi(config_value); + } + + RayConfig::instance().initialize(raylet_config); // Initialize the monitor. boost::asio::io_service io_service; diff --git a/test/multi_node_test_2.py b/test/multi_node_test_2.py index b0696f4b3..339546be1 100644 --- a/test/multi_node_test_2.py +++ b/test/multi_node_test_2.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import json import logging import pytest @@ -15,7 +16,33 @@ logger = logging.getLogger(__name__) @pytest.fixture def start_connected_cluster(): # Start the Ray processes. - g = Cluster(initialize_head=True, connect=True) + g = Cluster( + initialize_head=True, + connect=True, + head_node_args={ + "resources": dict(CPU=1), + "_internal_config": json.dumps({ + "num_heartbeats_timeout": 10 + }) + }) + yield g + # The code after the yield will run as teardown code. + ray.shutdown() + g.shutdown() + + +@pytest.fixture +def start_connected_longer_cluster(): + """Creates a cluster with a longer timeout.""" + g = Cluster( + initialize_head=True, + connect=True, + head_node_args={ + "resources": dict(CPU=1), + "_internal_config": json.dumps({ + "num_heartbeats_timeout": 20 + }) + }) yield g # The code after the yield will run as teardown code. ray.shutdown() @@ -34,6 +61,26 @@ def test_cluster(): assert not any(node.any_processes_alive() for node in g.list_all_nodes()) +def test_internal_config(start_connected_longer_cluster): + """Checks that the internal configuration setting works. + + We set the cluster to timeout nodes after 2 seconds of no timeouts. We + then remove a node, wait for 1 second to check that the cluster is out + of sync, then wait another 2 seconds (giving 1 second of leeway) to check + that the client has timed out. + """ + cluster = start_connected_longer_cluster + worker = cluster.add_node() + cluster.wait_for_nodes() + + cluster.remove_node(worker) + cluster.wait_for_nodes(retries=10) + assert ray.global_state.cluster_resources()["CPU"] == 2 + + cluster.wait_for_nodes(retries=20) + assert ray.global_state.cluster_resources()["CPU"] == 1 + + def test_wait_for_nodes(start_connected_cluster): """Unit test for `Cluster.wait_for_nodes`. @@ -45,10 +92,13 @@ def test_wait_for_nodes(start_connected_cluster): cluster.wait_for_nodes() [cluster.remove_node(w) for w in workers] cluster.wait_for_nodes() + + assert ray.global_state.cluster_resources()["CPU"] == 1 worker2 = cluster.add_node() cluster.wait_for_nodes() cluster.remove_node(worker2) cluster.wait_for_nodes() + assert ray.global_state.cluster_resources()["CPU"] == 1 def test_worker_plasma_store_failure(start_connected_cluster): diff --git a/test/stress_tests.py b/test/stress_tests.py index 307ae52e5..284bdb436 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import json import numpy as np import os import pytest @@ -216,7 +217,10 @@ def ray_start_reconstruction(request): start_ray_local=True, num_local_schedulers=num_local_schedulers, num_cpus=[1] * num_local_schedulers, - redirect_output=True) + redirect_output=True, + _internal_config=json.dumps({ + "initial_reconstruction_timeout_milliseconds": 200 + })) yield (redis_ip_address, redis_port, plasma_store_memory, num_local_schedulers) @@ -249,7 +253,6 @@ def ray_start_reconstruction(request): ray.shutdown() -@pytest.mark.skip("Add this test back once reconstruction is faster.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") @@ -291,7 +294,6 @@ def test_simple(ray_start_reconstruction): del values -@pytest.mark.skip("Add this test back once reconstruction is faster.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") @@ -348,7 +350,6 @@ def test_recursive(ray_start_reconstruction): del values -@pytest.mark.skip("Add this test back once reconstruction is faster.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.")