diff --git a/.travis.yml b/.travis.yml index 70f548f83..3633198f7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -69,14 +69,14 @@ matrix: script: - cd build - - bash ../src/common/test/run_valgrind.sh - - bash ../src/plasma/test/run_valgrind.sh - - bash ../src/local_scheduler/test/run_valgrind.sh + # - bash ../src/common/test/run_valgrind.sh + # - bash ../src/plasma/test/run_valgrind.sh + # - bash ../src/local_scheduler/test/run_valgrind.sh - bash ../src/ray/test/run_object_manager_valgrind.sh - cd .. - - python ./python/ray/plasma/test/test.py valgrind - - python ./python/ray/local_scheduler/test/test.py valgrind + # - python ./python/ray/plasma/test/test.py valgrind + # - python ./python/ray/local_scheduler/test/test.py valgrind # - python ./python/ray/global_scheduler/test/test.py valgrind # Build Linux wheels. @@ -107,16 +107,23 @@ matrix: env: - PYTHON=3.5 - RAY_USE_NEW_GCS=on - - RAY_USE_XRAY=1 + # Test legacy Ray. - os: linux dist: trusty - env: PYTHON=3.5 RAY_USE_XRAY=1 + env: PYTHON=3.5 RAY_USE_XRAY=0 install: - ./.travis/install-dependencies.sh - export PATH="$HOME/miniconda/bin:$PATH" - ./.travis/install-ray.sh - ./.travis/install-cython-examples.sh + + - cd build + - bash ../src/common/test/run_tests.sh + - bash ../src/plasma/test/run_tests.sh + - bash ../src/local_scheduler/test/run_tests.sh + - cd .. + script: - export PATH="$HOME/miniconda/bin:$PATH" # The following is needed so cloudpickle can find some of the @@ -128,8 +135,8 @@ matrix: - python -m pytest -v python/ray/common/test/test.py - python -m pytest -v python/ray/common/redis_module/runtest.py - python -m pytest -v python/ray/plasma/test/test.py - # - python -m pytest -v python/ray/local_scheduler/test/test.py - # - python -m pytest -v python/ray/global_scheduler/test/test.py + - python -m pytest -v python/ray/local_scheduler/test/test.py + - python -m pytest -v python/ray/global_scheduler/test/test.py - python -m pytest -v python/ray/test/test_global_state.py - python -m pytest -v python/ray/test/test_queue.py @@ -190,9 +197,6 @@ install: - ./src/ray/util/logging_test --gtest_filter=PrintLogTest* - ./src/ray/util/signal_test - - bash ../src/common/test/run_tests.sh - - bash ../src/plasma/test/run_tests.sh - - bash ../src/local_scheduler/test/run_tests.sh - cd .. script: @@ -203,11 +207,11 @@ script: # module is only found if the test directory is in the PYTHONPATH. - export PYTHONPATH="$PYTHONPATH:./test/" - - python -m pytest -v python/ray/common/test/test.py - - python -m pytest -v python/ray/common/redis_module/runtest.py - - python -m pytest -v python/ray/plasma/test/test.py - - python -m pytest -v python/ray/local_scheduler/test/test.py - - python -m pytest -v python/ray/global_scheduler/test/test.py + # - python -m pytest -v python/ray/common/test/test.py + # - python -m pytest -v python/ray/common/redis_module/runtest.py + # - python -m pytest -v python/ray/plasma/test/test.py + # - python -m pytest -v python/ray/local_scheduler/test/test.py + # - python -m pytest -v python/ray/global_scheduler/test/test.py - python -m pytest -v python/ray/test/test_global_state.py - python -m pytest -v python/ray/test/test_queue.py diff --git a/cmake/Modules/ArrowExternalProject.cmake b/cmake/Modules/ArrowExternalProject.cmake index 07c48d97f..08d250b81 100644 --- a/cmake/Modules/ArrowExternalProject.cmake +++ b/cmake/Modules/ArrowExternalProject.cmake @@ -15,10 +15,10 @@ # - PLASMA_SHARED_LIB set(arrow_URL https://github.com/apache/arrow.git) -# The PR for this commit is https://github.com/apache/arrow/pull/2664. We +# The PR for this commit is https://github.com/apache/arrow/pull/2792. We # include the link here to make it easier to find the right commit because # Arrow often rewrites git history and invalidates certain commits. -set(arrow_TAG 3545186d6997b943ffc3d79634f2d08eefbd7322) +set(arrow_TAG 2d0d3d0dc51999fbaafb15d8b8362a1ef3de2ef7) set(ARROW_INSTALL_PREFIX ${CMAKE_CURRENT_BINARY_DIR}/external/arrow-install) set(ARROW_HOME ${ARROW_INSTALL_PREFIX}) diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 906d650d2..99c3e3ba2 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -132,13 +132,15 @@ class GlobalState(object): use_raylet = self.redis_client.get("UseRaylet") if use_raylet is not None: - self.use_raylet = int(use_raylet) == 1 - elif os.environ.get("RAY_USE_XRAY") == "1": + self.use_raylet = bool(int(use_raylet)) + elif os.environ.get("RAY_USE_XRAY") == "0": # This environment variable is used in our testing setup. - print("Detected environment variable 'RAY_USE_XRAY'.") - self.use_raylet = True - else: + print("Detected environment variable 'RAY_USE_XRAY' with value " + "{}. This turns OFF xray.".format( + os.environ.get("RAY_USE_XRAY"))) self.use_raylet = False + else: + self.use_raylet = True # Get the rest of the information. self.redis_clients = [] @@ -1310,8 +1312,10 @@ class GlobalState(object): else: clients = self.client_table() for client in clients: - for key, value in client["Resources"].items(): - resources[key] += value + # Only count resources from live clients. + if client["IsInsertion"]: + for key, value in client["Resources"].items(): + resources[key] += value return dict(resources) @@ -1379,8 +1383,6 @@ class GlobalState(object): if local_scheduler_id not in local_scheduler_ids: del available_resources_by_id[local_scheduler_id] else: - # TODO(rliaw): Is this a fair assumption? - # Assumes the number of Redis clients does not change subscribe_clients = [ redis_client.pubsub(ignore_subscribe_messages=True) for redis_client in self.redis_clients diff --git a/python/ray/function_manager.py b/python/ray/function_manager.py index 0e123bd67..f9cb1a08b 100644 --- a/python/ray/function_manager.py +++ b/python/ray/function_manager.py @@ -5,6 +5,7 @@ from __future__ import print_function import hashlib import inspect import json +import sys import time import traceback from collections import ( @@ -342,6 +343,14 @@ class FunctionActorManager(object): checkpoint_interval = int(checkpoint_interval) actor_method_names = json.loads(decode(actor_method_names)) + # In Python 2, json loads strings as unicode, so convert them back to + # strings. + if sys.version_info < (3, 0): + actor_method_names = [ + method_name.encode("ascii") + for method_name in actor_method_names + ] + # Create a temporary actor with some temporary methods so that if # the actor fails to be unpickled, the temporary actor can be used # (just to produce error messages and to prevent the driver from diff --git a/python/ray/global_scheduler/test/test.py b/python/ray/global_scheduler/test/test.py index 37aad62ee..0e262e705 100644 --- a/python/ray/global_scheduler/test/test.py +++ b/python/ray/global_scheduler/test/test.py @@ -55,7 +55,7 @@ class TestGlobalScheduler(unittest.TestCase): # Start one Redis server and N pairs of (plasma, local_scheduler) self.node_ip_address = "127.0.0.1" redis_address, redis_shards = services.start_redis( - self.node_ip_address) + self.node_ip_address, use_raylet=False) redis_port = services.get_port(redis_address) time.sleep(0.1) # Create a client for the global state store. diff --git a/python/ray/plasma/test/test.py b/python/ray/plasma/test/test.py index a67f2d255..bc2418f00 100644 --- a/python/ray/plasma/test/test.py +++ b/python/ray/plasma/test/test.py @@ -125,7 +125,7 @@ class TestPlasmaManager(unittest.TestCase): store_name1, self.p2 = start_plasma_store(use_valgrind=USE_VALGRIND) store_name2, self.p3 = start_plasma_store(use_valgrind=USE_VALGRIND) # Start a Redis server. - redis_address, _ = services.start_redis("127.0.0.1") + redis_address, _ = services.start_redis("127.0.0.1", use_raylet=False) # Start two PlasmaManagers. manager_name1, self.p4, self.port1 = ray.plasma.start_plasma_manager( store_name1, redis_address, use_valgrind=USE_VALGRIND) @@ -483,7 +483,8 @@ class TestPlasmaManagerRecovery(unittest.TestCase): self.store_name, self.p2 = start_plasma_store( use_valgrind=USE_VALGRIND) # Start a Redis server. - self.redis_address, _ = services.start_redis("127.0.0.1") + self.redis_address, _ = services.start_redis( + "127.0.0.1", use_raylet=False) # Start a PlasmaManagers. manager_name, self.p3, self.port1 = ray.plasma.start_plasma_manager( self.store_name, self.redis_address, use_valgrind=USE_VALGRIND) diff --git a/python/ray/profiling.py b/python/ray/profiling.py index e4c2d438f..a16dd9d7a 100644 --- a/python/ray/profiling.py +++ b/python/ray/profiling.py @@ -230,8 +230,9 @@ class RayLogSpanRaylet(object): value: The attribute value. """ if not isinstance(key, str) or not isinstance(value, str): - raise ValueError("The extra_data argument must be a " - "dictionary mapping strings to strings.") + raise ValueError("The arguments 'key' and 'value' must both be " + "strings. Instead they are {} and {}.".format( + key, value)) self.extra_data[key] = value def __enter__(self): @@ -250,7 +251,8 @@ class RayLogSpanRaylet(object): for key, value in self.extra_data.items(): if not isinstance(key, str) or not isinstance(value, str): raise ValueError("The extra_data argument must be a " - "dictionary mapping strings to strings.") + "dictionary mapping strings to strings. " + "Instead it is {}.".format(self.extra_data)) if type is not None: extra_data = json.dumps({ diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index b28fc4e17..c4fd13b67 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -169,9 +169,9 @@ def cli(logging_level, logging_format): help="the file that contains the autoscaling config") @click.option( "--use-raylet", - is_flag=True, default=None, - help="use the raylet code path") + type=bool, + help="use the raylet code path, this defaults to false") @click.option( "--no-redirect-worker-output", is_flag=True, @@ -207,10 +207,16 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, if redis_address is not None: redis_address = services.address_to_ip(redis_address) - if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1": - # This environment variable is used in our testing setup. - logger.info("Detected environment variable 'RAY_USE_XRAY'.") - use_raylet = True + if use_raylet is None: + if os.environ.get("RAY_USE_XRAY") == "0": + # This environment variable is used in our testing setup. + logger.info("Detected environment variable 'RAY_USE_XRAY' with " + "value {}. This turns OFF xray.".format( + os.environ.get("RAY_USE_XRAY"))) + use_raylet = False + else: + use_raylet = True + if not use_raylet and redis_password is not None: raise Exception("Setting the 'redis-password' argument is not " "supported in legacy Ray. To run Ray with " diff --git a/python/ray/services.py b/python/ray/services.py index a887b2743..c78e7556b 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -430,7 +430,7 @@ def start_redis(node_ip_address, redis_shard_ports=None, num_redis_shards=1, redis_max_clients=None, - use_raylet=False, + use_raylet=True, redirect_output=False, redirect_worker_output=False, cleanup=True, @@ -450,8 +450,7 @@ def start_redis(node_ip_address, shard. redis_max_clients: If this is provided, Ray will attempt to configure Redis with this maxclients number. - use_raylet: True if the new raylet code path should be used. This is - not supported yet. + use_raylet: True if the new raylet code path should be used. redirect_output (bool): True if output should be redirected to a file and false otherwise. redirect_worker_output (bool): True if worker output should be @@ -1100,7 +1099,7 @@ def start_plasma_store(node_ip_address, cleanup=True, plasma_directory=None, huge_pages=False, - use_raylet=False, + use_raylet=True, plasma_store_socket_name=None, redis_password=None): """This method starts an object store process. @@ -1130,8 +1129,7 @@ def start_plasma_store(node_ip_address, be created. huge_pages: Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory. - use_raylet: True if the new raylet code path should be used. This is - not supported yet. + use_raylet: True if the new raylet code path should be used. redis_password (str): The password of the redis server. Return: @@ -1359,7 +1357,7 @@ def start_ray_processes(address_info=None, plasma_directory=None, huge_pages=False, autoscaling_config=None, - use_raylet=False, + use_raylet=True, plasma_store_socket_name=None, raylet_socket_name=None, temp_dir=None): @@ -1417,8 +1415,7 @@ def start_ray_processes(address_info=None, huge_pages: Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory. autoscaling_config: path to autoscaling config file. - use_raylet: True if the new raylet code path should be used. This is - not supported yet. + use_raylet: True if the new raylet code path should be used. plasma_store_socket_name (str): If provided, it will specify the socket name used by the plasma store. raylet_socket_name (str): If provided, it will specify the socket path @@ -1692,7 +1689,7 @@ def start_ray_node(node_ip_address, resources=None, plasma_directory=None, huge_pages=False, - use_raylet=False, + use_raylet=True, plasma_store_socket_name=None, raylet_socket_name=None, temp_dir=None): @@ -1730,8 +1727,7 @@ def start_ray_node(node_ip_address, be created. huge_pages: Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory. - use_raylet: True if the new raylet code path should be used. This is - not supported yet. + use_raylet: True if the new raylet code path should be used. plasma_store_socket_name (str): If provided, it will specify the socket name used by the plasma store. raylet_socket_name (str): If provided, it will specify the socket path @@ -1788,7 +1784,7 @@ def start_ray_head(address_info=None, plasma_directory=None, huge_pages=False, autoscaling_config=None, - use_raylet=False, + use_raylet=True, plasma_store_socket_name=None, raylet_socket_name=None, temp_dir=None): @@ -1840,8 +1836,7 @@ def start_ray_head(address_info=None, huge_pages: Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory. autoscaling_config: path to autoscaling config file. - use_raylet: True if the new raylet code path should be used. This is - not supported yet. + use_raylet: True if the new raylet code path should be used. plasma_store_socket_name (str): If provided, it will specify the socket name used by the plasma store. raylet_socket_name (str): If provided, it will specify the socket path diff --git a/python/ray/test/test_global_state.py b/python/ray/test/test_global_state.py index 7b12ee022..c5501dc9c 100644 --- a/python/ray/test/test_global_state.py +++ b/python/ray/test/test_global_state.py @@ -2,57 +2,57 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import pytest import time import ray -def setup_module(): - if not ray.worker.global_worker.connected: - ray.init(num_cpus=1) - - # Finish initializing Ray. Otherwise available_resources() does not - # reflect resource use of submitted tasks - ray.get(cpu_task.remote(0)) +@pytest.fixture +def ray_start(): + # Start the Ray processes. + ray.init(num_cpus=1) + yield None + # The code after the yield will run as teardown code. + ray.shutdown() -@ray.remote(num_cpus=1) -def cpu_task(seconds): - time.sleep(seconds) +def test_replenish_resources(ray_start): + cluster_resources = ray.global_state.cluster_resources() + available_resources = ray.global_state.available_resources() + assert cluster_resources == available_resources + @ray.remote + def cpu_task(): + pass + + ray.get(cpu_task.remote()) + start = time.time() + resources_reset = False -class TestAvailableResources(object): timeout = 10 - - def test_no_tasks(self): - cluster_resources = ray.global_state.cluster_resources() + while not resources_reset and time.time() - start < timeout: available_resources = ray.global_state.available_resources() - assert cluster_resources == available_resources + resources_reset = (cluster_resources == available_resources) - def test_replenish_resources(self): - cluster_resources = ray.global_state.cluster_resources() + assert resources_reset - ray.get(cpu_task.remote(0)) - start = time.time() - resources_reset = False - while not resources_reset and time.time() - start < self.timeout: - available_resources = ray.global_state.available_resources() - resources_reset = (cluster_resources == available_resources) +def test_uses_resources(ray_start): + cluster_resources = ray.global_state.cluster_resources() - assert resources_reset + @ray.remote + def cpu_task(): + time.sleep(1) - def test_uses_resources(self): - cluster_resources = ray.global_state.cluster_resources() - task_id = cpu_task.remote(1) - start = time.time() - resource_used = False + cpu_task.remote() + resource_used = False - while not resource_used and time.time() - start < self.timeout: - available_resources = ray.global_state.available_resources() - resource_used = available_resources[ - "CPU"] == cluster_resources["CPU"] - 1 + start = time.time() + timeout = 10 + while not resource_used and time.time() - start < timeout: + available_resources = ray.global_state.available_resources() + resource_used = available_resources[ + "CPU"] == cluster_resources["CPU"] - 1 - assert resource_used - - ray.get(task_id) # clean up to reset resources + assert resource_used diff --git a/python/ray/test/test_ray_init.py b/python/ray/test/test_ray_init.py index 62d581003..a64dd4a94 100644 --- a/python/ray/test/test_ray_init.py +++ b/python/ray/test/test_ray_init.py @@ -25,19 +25,11 @@ def shutdown_only(): class TestRedisPassword(object): - @pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") != "on" - and os.environ.get("RAY_USE_XRAY"), - reason="Redis authentication works for raylet and old GCS.") - def test_exceptions(self, password, shutdown_only): - with pytest.raises(Exception): - ray.init(redis_password=password) - @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="New GCS API doesn't support Redis authentication yet.") @pytest.mark.skipif( - not os.environ.get("RAY_USE_XRAY"), + os.environ.get("RAY_USE_XRAY") == "0", reason="Redis authentication is not supported in legacy Ray.") def test_redis_password(self, password, shutdown_only): # Workaround for https://github.com/ray-project/ray/issues/3045 diff --git a/python/ray/worker.py b/python/ray/worker.py index e19c43375..bce109cd8 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1223,7 +1223,7 @@ def _initialize_serialization(driver_id, worker=global_worker): def get_address_info_from_redis_helper(redis_address, node_ip_address, - use_raylet=False, + use_raylet=True, redis_password=None): redis_ip_address, redis_port = redis_address.split(":") # For this command to work, some other client (on the same machine as @@ -1333,7 +1333,7 @@ def get_address_info_from_redis_helper(redis_address, def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5, - use_raylet=False, + use_raylet=True, redis_password=None): counter = 0 while True: @@ -1497,10 +1497,15 @@ def _init(address_info=None, else: driver_mode = SCRIPT_MODE - if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1": - # This environment variable is used in our testing setup. - logger.info("Detected environment variable 'RAY_USE_XRAY'.") - use_raylet = True + if use_raylet is None: + if os.environ.get("RAY_USE_XRAY") == "0": + # This environment variable is used in our testing setup. + logger.info("Detected environment variable 'RAY_USE_XRAY' with " + "value {}. This turns OFF xray.".format( + os.environ.get("RAY_USE_XRAY"))) + use_raylet = False + else: + use_raylet = True # Get addresses of existing services. if address_info is None: @@ -1762,10 +1767,16 @@ def init(redis_address=None, else: raise Exception("Perhaps you called ray.init twice by accident?") - if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1": - # This environment variable is used in our testing setup. - logger.info("Detected environment variable 'RAY_USE_XRAY'.") - use_raylet = True + if use_raylet is None: + if os.environ.get("RAY_USE_XRAY") == "0": + # This environment variable is used in our testing setup. + logger.info("Detected environment variable 'RAY_USE_XRAY' with " + "value {}. This turns OFF xray.".format( + os.environ.get("RAY_USE_XRAY"))) + use_raylet = False + else: + use_raylet = True + if not use_raylet and redis_password is not None: raise Exception("Setting the 'redis_password' argument is not " "supported in legacy Ray. To run Ray with " @@ -1993,7 +2004,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, - use_raylet=False, + use_raylet=True, redis_password=None): """Connect this worker to the local scheduler, to Plasma, and to Redis. diff --git a/src/ray/object_manager/connection_pool.cc b/src/ray/object_manager/connection_pool.cc index 58508f977..2104eaa4a 100644 --- a/src/ray/object_manager/connection_pool.cc +++ b/src/ray/object_manager/connection_pool.cc @@ -79,7 +79,7 @@ void ConnectionPool::Remove(ReceiverMapType &conn_map, const ClientID &client_id auto &connections = it->second; int64_t pos = std::find(connections.begin(), connections.end(), conn) - connections.begin(); - if (pos >= (int64_t)connections.size()) { + if (pos >= static_cast(connections.size())) { return; } connections.erase(connections.begin() + pos); diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index fdb362359..6651f2042 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -150,6 +150,7 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk CreateChunkState::REFERENCED); create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::SEALED; create_buffer_state_[object_id].num_seals_remaining--; + RAY_CHECK(create_buffer_state_[object_id].num_seals_remaining >= 0); RAY_LOG(DEBUG) << "SealChunk" << object_id << " " << create_buffer_state_[object_id].num_seals_remaining; if (create_buffer_state_[object_id].num_seals_remaining == 0) { diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 38c7cdd45..a3e43ebbb 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -110,8 +110,9 @@ ray::Status ObjectDirectory::GetInformation(const ClientID &client_id, if (result_client_id == ClientID::nil() || !data.is_insertion) { fail_callback(); } else { - const auto &info = RemoteConnectionInfo(client_id, data.node_manager_address, - (uint16_t)data.object_manager_port); + const auto &info = + RemoteConnectionInfo(client_id, data.node_manager_address, + static_cast(data.object_manager_port)); success_callback(info); } return ray::Status::OK(); diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 84be11066..e73565020 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -97,6 +97,7 @@ void ObjectManager::StopIOService() { void ObjectManager::HandleObjectAdded(const ObjectInfoT &object_info) { // Notify the object directory that the object has been added to this node. ObjectID object_id = ObjectID::from_binary(object_info.object_id); + RAY_CHECK(local_objects_.count(object_id) == 0); local_objects_[object_id] = object_info; ray::Status status = object_directory_->ReportObjectAdded(object_id, client_id_, object_info); @@ -122,7 +123,9 @@ void ObjectManager::HandleObjectAdded(const ObjectInfoT &object_info) { } void ObjectManager::NotifyDirectoryObjectDeleted(const ObjectID &object_id) { - local_objects_.erase(object_id); + auto it = local_objects_.find(object_id); + RAY_CHECK(it != local_objects_.end()); + local_objects_.erase(it); ray::Status status = object_directory_->ReportObjectRemoved(object_id, client_id_); } @@ -352,6 +355,10 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { send_service_.post([this, client_id, object_id, data_size, metadata_size, chunk_index, info]() { + // NOTE: When this callback executes, it's possible that the object + // will have already been evicted. It's also possible that the + // object could be in the process of being transferred to this + // object manager from another object manager. ExecuteSendObject(client_id, object_id, data_size, metadata_size, chunk_index, info); }); @@ -398,15 +405,19 @@ ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id, // Fail on status not okay. The object is local, and there is // no other anticipated error here. - RAY_CHECK_OK(chunk_status.second); + ray::Status status = chunk_status.second; + if (!chunk_status.second.ok()) { + RAY_LOG(WARNING) << "Attempting to push object " << object_id + << " which is not local. It may have been evicted."; + RAY_RETURN_NOT_OK(status); + } // Create buffer. flatbuffers::FlatBufferBuilder fbb; - // TODO(hme): use to_flatbuf auto message = object_manager_protocol::CreatePushRequestMessage( - fbb, fbb.CreateString(object_id.binary()), chunk_index, data_size, metadata_size); + fbb, to_flatbuf(fbb, object_id), chunk_index, data_size, metadata_size); fbb.Finish(message); - ray::Status status = conn->WriteMessage( + status = conn->WriteMessage( static_cast(object_manager_protocol::MessageType::PushRequest), fbb.GetSize(), fbb.GetBufferPointer()); if (!status.ok()) { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 0db61e7dd..8bbc28273 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -401,7 +401,7 @@ void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &cl } // Locate the client id in remote client table and update available resources based on // the received heartbeat information. - auto it = this->cluster_resource_map_.find(client_id); + auto it = cluster_resource_map_.find(client_id); if (it == cluster_resource_map_.end()) { // Haven't received the client registration for this client yet, skip this heartbeat. RAY_LOG(INFO) << "[HeartbeatAdded]: received heartbeat from unknown client id " @@ -1286,8 +1286,7 @@ void NodeManager::AssignTask(Task &task) { auto acquired_resources = local_available_resources_.Acquire(spec.GetRequiredResources()); const auto &my_client_id = gcs_client_->client_table().GetLocalClientId(); - RAY_CHECK( - this->cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources())); + RAY_CHECK(cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources())); if (spec.IsActorCreationTask()) { // Check that we are not placing an actor creation task on a node with 0 CPUs. @@ -1394,8 +1393,9 @@ void NodeManager::FinishAssignedTask(Worker &worker) { local_available_resources_.Release(worker.GetTaskResourceIds()); worker.ResetTaskResourceIds(); - RAY_CHECK(this->cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()] - .Release(task.GetTaskSpecification().GetRequiredResources())); + RAY_CHECK( + cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( + task.GetTaskSpecification().GetRequiredResources())); } // If the finished task was an actor task, mark the returned dummy object as diff --git a/test/actor_test.py b/test/actor_test.py index 13aeb78f7..0ba8701f8 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1232,7 +1232,7 @@ def test_blocking_actor_task(shutdown_only): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only works with xray.") def test_exception_raised_when_actor_node_dies(shutdown_only): ray.worker._init(start_ray_local=True, num_local_schedulers=2, num_cpus=1) @@ -1279,7 +1279,7 @@ def test_exception_raised_when_actor_node_dies(shutdown_only): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -1329,7 +1329,7 @@ def test_local_scheduler_dying(shutdown_only): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -1466,7 +1466,7 @@ def setup_counter_actor(test_checkpoint=False, @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -1496,7 +1496,7 @@ def test_checkpointing(shutdown_only): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -1527,7 +1527,7 @@ def test_remote_checkpoint(shutdown_only): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -1558,7 +1558,7 @@ def test_lost_checkpoint(shutdown_only): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -1591,7 +1591,7 @@ def test_checkpoint_exception(shutdown_only): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -1663,7 +1663,7 @@ def test_distributed_handle(self): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -1822,7 +1822,7 @@ def _test_nondeterministic_reconstruction(num_forks, num_items_per_fork, @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -1860,7 +1860,7 @@ def setup_queue_actor(): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") def test_fork(setup_queue_actor): queue = setup_queue_actor @@ -1879,7 +1879,7 @@ def test_fork(setup_queue_actor): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") def test_fork_consistency(setup_queue_actor): queue = setup_queue_actor diff --git a/test/component_failures_test.py b/test/component_failures_test.py index 3a57452e6..2957ef626 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -36,7 +36,7 @@ def shutdown_only(): # This test checks that when a worker dies in the middle of a get, the plasma # store and raylet will not die. @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only works with xray.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -89,7 +89,7 @@ def test_dying_worker_get_raylet(shutdown_only): # This test checks that when a driver dies in the middle of a get, the plasma # store and raylet will not die. @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only works with xray.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -107,8 +107,8 @@ def test_dying_driver_get(shutdown_only): driver = """ import ray ray.init("{}") -ray.get(ray.ObjectID({})) -""".format(address_info["redis_address"], x_id.id()) +ray.get(ray.ObjectID(ray.utils.hex_to_binary("{}"))) +""".format(address_info["redis_address"], x_id.hex()) p = run_string_as_driver_nonblocking(driver) # Make sure the driver is running. @@ -135,7 +135,7 @@ ray.get(ray.ObjectID({})) # This test checks that when a worker dies in the middle of a get, the # plasma store and manager will not die. @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY", False), + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -171,7 +171,7 @@ def test_dying_worker_get(ray_start_workers_separate): # This test checks that when a worker dies in the middle of a wait, the plasma # store and raylet will not die. @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only works with xray.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -216,7 +216,7 @@ def test_dying_worker_wait_raylet(shutdown_only): # This test checks that when a driver dies in the middle of a wait, the plasma # store and raylet will not die. @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only works with xray.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -234,8 +234,8 @@ def test_dying_driver_wait(shutdown_only): driver = """ import ray ray.init("{}") -ray.wait([ray.ObjectID({})]) -""".format(address_info["redis_address"], x_id.id()) +ray.wait([ray.ObjectID(ray.utils.hex_to_binary("{}"))]) +""".format(address_info["redis_address"], x_id.hex()) p = run_string_as_driver_nonblocking(driver) # Make sure the driver is running. @@ -262,7 +262,7 @@ ray.wait([ray.ObjectID({})]) # This test checks that when a worker dies in the middle of a wait, the # plasma store and manager will not die. @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY", False), + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -342,7 +342,7 @@ def test_worker_failed(ray_start_workers_separate_multinode): def _test_component_failed(component_type): """Kill a component on all worker nodes and check workload succeeds.""" # Raylet is able to pass a harder failure test than legacy ray. - use_raylet = os.environ.get("RAY_USE_XRAY") == "1" + use_raylet = os.environ.get("RAY_USE_XRAY") != "0" # Start with 4 workers and 4 cores. num_local_schedulers = 4 @@ -452,7 +452,7 @@ def check_components_alive(component_type, check_component_alive): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only makes sense with xray.") def test_raylet_failed(): # Kill all local schedulers on worker nodes. @@ -466,7 +466,7 @@ def test_raylet_failed(): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not make sense with xray.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", @@ -485,7 +485,7 @@ def test_local_scheduler_failed(): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not make sense with xray.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", diff --git a/test/failure_test.py b/test/failure_test.py index 9cd25962b..2d01896b3 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -395,7 +395,7 @@ def ray_start_object_store_memory(): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") def test_put_error1(ray_start_object_store_memory): num_objects = 3 @@ -439,7 +439,7 @@ def test_put_error1(ray_start_object_store_memory): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") def test_put_error2(ray_start_object_store_memory): # This is the same as the previous test, but it calls ray.put directly. @@ -495,7 +495,7 @@ def test_version_mismatch(shutdown_only): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only works with xray.") def test_warning_monitor_died(shutdown_only): ray.init(num_cpus=0) @@ -539,7 +539,7 @@ def test_export_large_objects(ray_start_regular): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only works with xray.") def test_warning_for_infeasible_tasks(ray_start_regular): # Check that we get warning messages for infeasible tasks. @@ -592,7 +592,7 @@ def ray_start_two_nodes(): # Note that this test will take at least 10 seconds because it must wait for # the monitor to detect enough missed heartbeats. @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only works with xray.") def test_warning_for_dead_node(ray_start_two_nodes): # Wait for the raylet to appear in the client table. diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index 821fefa7b..d83765e97 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -11,222 +11,222 @@ ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) DOCKER_SHA=$($ROOT_DIR/../../build-docker.sh --output-sha --no-cache) echo "Using Docker image" $DOCKER_SHA -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env PongDeterministic-v0 \ --run A3C \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 2}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env Pong-ram-v4 \ --run A3C \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 2}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env PongDeterministic-v0 \ --run A2C \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 2}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v1 \ --run PPO \ --stop '{"training_iteration": 2}' \ --config '{"kl_coeff": 1.0, "num_sgd_iter": 10, "lr": 1e-4, "sgd_minibatch_size": 64, "train_batch_size": 2000, "num_workers": 1, "model": {"free_log_std": true}}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v1 \ --run PPO \ --stop '{"training_iteration": 2}' \ --config '{"simple_optimizer": false, "num_sgd_iter": 2, "model": {"use_lstm": true}}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v1 \ --run PPO \ --stop '{"training_iteration": 2}' \ --config '{"simple_optimizer": true, "num_sgd_iter": 2, "model": {"use_lstm": true}}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v1 \ --run PPO \ --stop '{"training_iteration": 2}' \ --config '{"kl_coeff": 1.0, "num_sgd_iter": 10, "lr": 1e-4, "sgd_minibatch_size": 64, "train_batch_size": 2000, "num_workers": 1, "use_gae": false, "batch_mode": "complete_episodes"}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env Pendulum-v0 \ --run ES \ --stop '{"training_iteration": 2}' \ --config '{"stepsize": 0.01, "episodes_per_batch": 20, "train_batch_size": 100, "num_workers": 2}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env Pong-v0 \ --run ES \ --stop '{"training_iteration": 2}' \ --config '{"stepsize": 0.01, "episodes_per_batch": 20, "train_batch_size": 100, "num_workers": 2}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run A3C \ --stop '{"training_iteration": 2}' \ -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run DQN \ --stop '{"training_iteration": 2}' \ --config '{"lr": 1e-3, "schedule_max_timesteps": 100000, "exploration_fraction": 0.1, "exploration_final_eps": 0.02, "dueling": false, "hiddens": [], "model": {"fcnet_hiddens": [64], "fcnet_activation": "relu"}}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run DQN \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 2}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run APEX \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 2, "timesteps_per_iteration": 1000, "gpu": false, "min_iter_time_s": 1}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env FrozenLake-v0 \ --run DQN \ --stop '{"training_iteration": 2}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env FrozenLake-v0 \ --run PPO \ --stop '{"training_iteration": 2}' \ --config '{"num_sgd_iter": 10, "sgd_minibatch_size": 64, "train_batch_size": 1000, "num_workers": 1}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env PongDeterministic-v4 \ --run DQN \ --stop '{"training_iteration": 2}' \ --config '{"lr": 1e-4, "schedule_max_timesteps": 2000000, "buffer_size": 10000, "exploration_fraction": 0.1, "exploration_final_eps": 0.01, "sample_batch_size": 4, "learning_starts": 10000, "target_network_update_freq": 1000, "gamma": 0.99, "prioritized_replay": true}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env MontezumaRevenge-v0 \ --run PPO \ --stop '{"training_iteration": 2}' \ --config '{"kl_coeff": 1.0, "num_sgd_iter": 10, "lr": 1e-4, "sgd_minibatch_size": 64, "train_batch_size": 2000, "num_workers": 1, "model": {"dim": 40, "conv_filters": [[16, [8, 8], 4], [32, [4, 4], 2], [512, [5, 5], 1]]}}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v1 \ --run A3C \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 2, "model": {"use_lstm": true}}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run DQN \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 2}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run PG \ --stop '{"training_iteration": 2}' \ --config '{"sample_batch_size": 500, "num_workers": 1}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run PG \ --stop '{"training_iteration": 2}' \ --config '{"sample_batch_size": 500, "num_workers": 1, "model": {"use_lstm": true, "max_seq_len": 100}}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run PG \ --stop '{"training_iteration": 2}' \ --config '{"sample_batch_size": 500, "num_workers": 1, "num_envs_per_worker": 10}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env Pong-v0 \ --run PG \ --stop '{"training_iteration": 2}' \ --config '{"sample_batch_size": 500, "num_workers": 1}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env FrozenLake-v0 \ --run PG \ --stop '{"training_iteration": 2}' \ --config '{"sample_batch_size": 500, "num_workers": 1}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env Pendulum-v0 \ --run DDPG \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 1}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run IMPALA \ --stop '{"training_iteration": 2}' \ --config '{"num_gpus": 0, "num_workers": 2, "min_iter_time_s": 1}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run IMPALA \ --stop '{"training_iteration": 2}' \ --config '{"num_gpus": 0, "num_workers": 2, "min_iter_time_s": 1, "model": {"use_lstm": true}}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run IMPALA \ --stop '{"training_iteration": 2}' \ --config '{"num_gpus": 0, "num_workers": 2, "min_iter_time_s": 1, "num_parallel_data_loaders": 2, "replay_proportion": 1.0}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v0 \ --run IMPALA \ --stop '{"training_iteration": 2}' \ --config '{"num_gpus": 0, "num_workers": 2, "min_iter_time_s": 1, "num_parallel_data_loaders": 2, "replay_proportion": 1.0, "model": {"use_lstm": true}}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env MountainCarContinuous-v0 \ --run DDPG \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 1}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ rllib train \ --env MountainCarContinuous-v0 \ --run DDPG \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 1}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env Pendulum-v0 \ --run APEX_DDPG \ @@ -234,97 +234,97 @@ docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 2, "optimizer": {"num_replay_buffer_shards": 1}, "learning_starts": 100, "min_iter_time_s": 1}' -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ sh /ray/test/jenkins_tests/multi_node_tests/test_rllib_eval.sh -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/test/test_local.py -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/test/test_checkpoint_restore.py -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/test/test_policy_evaluator.py -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/test/test_nested_spaces.py -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/test/test_serving_env.py -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/test/test_lstm.py -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/test/test_multi_agent_env.py -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/test/test_supported_spaces.py -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/tune/examples/tune_mnist_ray.py \ --smoke-test -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/tune/examples/pbt_example.py \ --smoke-test -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/tune/examples/hyperband_example.py \ --smoke-test -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/tune/examples/async_hyperband_example.py \ --smoke-test -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/tune/examples/tune_mnist_ray_hyperband.py \ --smoke-test -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/tune/examples/tune_mnist_async_hyperband.py \ --smoke-test -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/tune/examples/hyperopt_example.py \ --smoke-test -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/tune/examples/tune_mnist_keras.py \ --smoke-test -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/tune/examples/mnist_pytorch.py \ --smoke-test -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/tune/examples/mnist_pytorch_trainable.py \ --smoke-test -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/tune/examples/genetic_example.py \ --smoke-test -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/examples/multiagent_cartpole.py --num-iters=2 -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/examples/multiagent_two_trainers.py --num-iters=2 -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/examples/cartpole_lstm.py --run=PPO --stop=200 -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/examples/cartpole_lstm.py --run=IMPALA --stop=100 -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/examples/cartpole_lstm.py --stop=200 --use-prev-action-reward -docker run -e "RAY_USE_XRAY=1" --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/experimental/sgd/test_sgd.py --num-iters=2 # No Xray for PyTorch -docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run -e RAY_USE_XRAY=0 --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env PongDeterministic-v4 \ --run A3C \ @@ -332,7 +332,7 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --config '{"num_workers": 2, "use_pytorch": true, "model": {"use_lstm": false, "grayscale": true, "zero_mean": false, "dim": 84, "channel_major": true}, "preprocessor_pref": "rllib"}' # No Xray for PyTorch -docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ +docker run -e RAY_USE_XRAY=0 --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v1 \ --run A3C \ diff --git a/test/multi_node_test.py b/test/multi_node_test.py index a1f0bd87b..1cc783529 100644 --- a/test/multi_node_test.py +++ b/test/multi_node_test.py @@ -202,7 +202,7 @@ def ray_start_head_with_resources(): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only works with xray.") def test_drivers_release_resources(ray_start_head_with_resources): redis_address = ray_start_head_with_resources diff --git a/test/runtest.py b/test/runtest.py index a9efb2fb3..c0a5dbc54 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -990,7 +990,7 @@ def test_running_function_on_all_workers(shutdown_only): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray (nor is it intended to).") def test_logging_api(shutdown_only): ray.init(num_cpus=1) @@ -1038,7 +1038,7 @@ def test_logging_api(shutdown_only): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only works with xray.") def test_profiling_api(shutdown_only): ray.init(num_cpus=2) @@ -1198,7 +1198,7 @@ def test_multithreading(shutdown_only): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only works with xray.") def test_free_objects_multi_node(shutdown_only): ray.worker._init( @@ -1639,7 +1639,7 @@ def test_gpu_ids(shutdown_only): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only works with xray.") def test_zero_cpus(shutdown_only): ray.init(num_cpus=0) @@ -1669,7 +1669,7 @@ def test_zero_cpus_actor(shutdown_only): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") != "1", + os.environ.get("RAY_USE_XRAY") == "0", reason="This test only works with xray.") def test_fractional_resources(shutdown_only): ray.init(num_cpus=6, num_gpus=3, resources={"Custom": 1}) @@ -2043,7 +2043,7 @@ def test_blocking_tasks(shutdown_only): object_ids = [f.remote(i, j) for j in range(2)] return ray.wait(object_ids, num_returns=len(object_ids)) - if os.environ.get("RAY_USE_XRAY") == "1": + if os.environ.get("RAY_USE_XRAY") != "0": ray.get([h.remote(i) for i in range(4)]) @ray.remote @@ -2350,7 +2350,7 @@ def test_log_file_api(shutdown_only): os.environ.get("RAY_USE_NEW_GCS") == "on", reason="New GCS API doesn't have a Python API yet.") @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray (nor is it intended to).") def test_task_profile_api(shutdown_only): ray.init(num_cpus=1, redirect_output=True) @@ -2419,7 +2419,7 @@ def test_workers(shutdown_only): os.environ.get("RAY_USE_NEW_GCS") == "on", reason="New GCS API doesn't have a Python API yet.") @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") def test_dump_trace_file(shutdown_only): ray.init(num_cpus=1, redirect_output=True) @@ -2463,7 +2463,7 @@ def test_dump_trace_file(shutdown_only): os.environ.get("RAY_USE_NEW_GCS") == "on", reason="New GCS API doesn't have a Python API yet.") @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") def test_flush_api(shutdown_only): ray.init(num_cpus=1) diff --git a/test/stress_tests.py b/test/stress_tests.py index 6fc7cc487..7c37d9091 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -162,7 +162,7 @@ def ray_start_reconstruction(request): # Start the Redis global state store. node_ip_address = "127.0.0.1" - use_raylet = os.environ.get("RAY_USE_XRAY") == "1" + use_raylet = os.environ.get("RAY_USE_XRAY") != "0" redis_address, redis_shards = ray.services.start_redis( node_ip_address, use_raylet=use_raylet) redis_ip_address = ray.services.get_ip_address(redis_address) @@ -186,7 +186,8 @@ def ray_start_reconstruction(request): store_stdout_file=store_stdout_file, store_stderr_file=store_stderr_file, manager_stdout_file=manager_stdout_file, - manager_stderr_file=manager_stderr_file)) + manager_stderr_file=manager_stderr_file, + use_raylet=use_raylet)) # Start the rest of the services in the Ray cluster. address_info = { @@ -401,7 +402,7 @@ def wait_for_errors(error_check): @pytest.mark.skipif( - os.environ.get("RAY_USE_XRAY") == "1", + os.environ.get("RAY_USE_XRAY") != "0", reason="This test does not work with xray yet.") @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on",