diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index 8e277da2b..51f404f14 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -156,6 +156,7 @@ def ray_deps_setup(): url = "https://github.com/google/glog/archive/925858d9969d8ee22aabc3635af00a37891f4e25.tar.gz", sha256 = "fb86eca661497ac6f9ce2a106782a30215801bb8a7c8724c6ec38af05a90acf3", patches = [ + "//thirdparty/patches:glog-log-pid-tid.patch", "//thirdparty/patches:glog-stack-trace.patch", ], ) diff --git a/ci/travis/ci.sh b/ci/travis/ci.sh index 8581fe31d..76b0ee01a 100755 --- a/ci/travis/ci.sh +++ b/ci/travis/ci.sh @@ -237,7 +237,7 @@ build_wheels() { for pyversion in "${pyversions[@]}"; do if [ -z "${pyversion}" ]; then continue; fi "${ROOT_DIR}"/bazel-preclean.sh - git clean -f -f -x -d -e "${local_dir}" -e python/ray/dashboard/client + git clean -q -f -f -x -d -e "${local_dir}" -e python/ray/dashboard/client git checkout -q -f -- . cp -R -f -a -T -- "${backup_conda}" "${CONDA_PREFIX}" local existing_version diff --git a/ci/travis/install-dependencies.sh b/ci/travis/install-dependencies.sh index dc820b75d..c0bbb5621 100755 --- a/ci/travis/install-dependencies.sh +++ b/ci/travis/install-dependencies.sh @@ -197,19 +197,28 @@ install_dependencies() { install_nvm install_pip - if [ -n "${PYTHON-}" ]; then + if [ -n "${PYTHON-}" ] || [ "${LINT-}" = 1 ]; then install_miniconda + fi + # Install modules needed in all jobs. + pip install --no-clean dm-tree # --no-clean is due to: https://github.com/deepmind/tree/issues/5 + + if [ -n "${PYTHON-}" ]; then # PyTorch is installed first since we are using a "-f" directive to find the wheels. # We want to install the CPU version only. local torch_url="https://download.pytorch.org/whl/torch_stable.html" case "${OSTYPE}" in - linux*) pip install torch==1.5.0+cpu torchvision==0.6.0+cpu -f "${torch_url}";; darwin*) pip install torch torchvision;; - msys*) pip install torch==1.5.0+cpu torchvision==0.6.0+cpu -f "${torch_url}";; + *) pip install torch==1.5.0+cpu torchvision==0.6.0+cpu -f "${torch_url}";; esac - pip_packages=(scipy tensorflow=="${TF_VERSION:-2.1.0}" cython==0.29.0 gym \ + local tf_version + case "${OSTYPE}" in + msys) tf_version="${TF_VERSION:-2.2.0}";; + *) tf_version="${TF_VERSION:-2.1.0}";; + esac + pip_packages+=(scipy tensorflow=="${tf_version}" cython==0.29.0 gym \ opencv-python-headless pyyaml pandas==0.24.2 requests feather-format lxml openpyxl xlrd \ py-spy pytest pytest-timeout networkx tabulate aiohttp uvicorn dataclasses pygments werkzeug \ kubernetes flask grpcio pytest-sugar pytest-rerunfailures pytest-asyncio scikit-learn==0.22.2 numba \ @@ -235,17 +244,19 @@ install_dependencies() { fi if [ "${LINT-}" = 1 ]; then - install_miniconda install_linters # readthedocs has an antiquated build env. # This is a best effort to reproduce it locally to avoid doc build failures and hidden errors. - pip install -r "${WORKSPACE_DIR}"/doc/requirements-rtd.txt - pip install -r "${WORKSPACE_DIR}"/doc/requirements-doc.txt + local python_version + python_version="$(python -s -c "import sys; print('%s.%s' % sys.version_info[:2])")" + if [ "${OSTYPE}" = msys ] && [ "${python_version}" = "3.8" ]; then + { echo "WARNING: Pillow binaries not available on Windows; cannot build docs"; } 2> /dev/null + else + pip install -r "${WORKSPACE_DIR}"/doc/requirements-rtd.txt + pip install -r "${WORKSPACE_DIR}"/doc/requirements-doc.txt + fi fi - # Install modules needed in all jobs. - pip install dm-tree - # Additional RLlib dependencies. if [ "${RLLIB_TESTING-}" = 1 ]; then pip install tensorflow-probability=="${TFP_VERSION-0.8}" gast==0.2.2 \ diff --git a/python/ray/monitor.py b/python/ray/monitor.py index a1197cf69..48b9a4d86 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -53,7 +53,12 @@ class Monitor: def __del__(self): """Destruct the monitor object.""" # We close the pubsub client to avoid leaking file descriptors. - self.primary_subscribe_client.close() + try: + primary_subscribe_client = self.primary_subscribe_client + except AttributeError: + primary_subscribe_client = None + if primary_subscribe_client is not None: + primary_subscribe_client.close() def subscribe(self, channel): """Subscribe to the given channel on the primary Redis shard. diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index 3e53bec7d..af3715246 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -1,4 +1,5 @@ import asyncio +import errno import json import fnmatch import os @@ -12,6 +13,9 @@ import ray import psutil # We must import psutil after ray because we bundle it with ray. +if sys.platform == "win32": + import _winapi + class RayTestTimeoutException(Exception): """Exception used to identify timeouts from test utilities.""" @@ -27,11 +31,24 @@ def _pid_alive(pid): Returns: This returns false if the process is dead. Otherwise, it returns true. """ + no_such_process = errno.EINVAL if sys.platform == "win32" else errno.ESRCH + alive = True try: - os.kill(pid, 0) - return True - except OSError: - return False + if sys.platform == "win32": + SYNCHRONIZE = 0x00100000 # access mask defined in + handle = _winapi.OpenProcess(SYNCHRONIZE, False, pid) + try: + alive = (_winapi.WaitForSingleObject(handle, 0) != + _winapi.WAIT_OBJECT_0) + finally: + _winapi.CloseHandle(handle) + else: + os.kill(pid, 0) + except OSError as ex: + if ex.errno != no_such_process: + raise + alive = False + return alive def wait_for_pid_to_exit(pid, timeout=20): diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index c926e5f87..5cd56b613 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -358,7 +358,7 @@ def test_decorator_args(ray_start_regular): with pytest.raises(Exception): @ray.remote(invalid_kwarg=0) # noqa: F811 - class Actor: + class Actor: # noqa: F811 def __init__(self): pass @@ -366,25 +366,25 @@ def test_decorator_args(ray_start_regular): with pytest.raises(Exception): @ray.remote(num_cpus=0, invalid_kwarg=0) # noqa: F811 - class Actor: + class Actor: # noqa: F811 def __init__(self): pass # This is a valid way of using the decorator. @ray.remote(num_cpus=1) # noqa: F811 - class Actor: + class Actor: # noqa: F811 def __init__(self): pass # This is a valid way of using the decorator. @ray.remote(num_gpus=1) # noqa: F811 - class Actor: + class Actor: # noqa: F811 def __init__(self): pass # This is a valid way of using the decorator. @ray.remote(num_cpus=1, num_gpus=1) # noqa: F811 - class Actor: + class Actor: # noqa: F811 def __init__(self): pass diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index 3428e2a28..dd7782278 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -15,6 +15,8 @@ from ray.test_utils import (relevant_errors, wait_for_condition, wait_for_errors, wait_for_pid_to_exit, generate_internal_config_map) +SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM + @pytest.fixture def ray_checkpointable_actor_cls(request): @@ -153,7 +155,7 @@ def test_actor_restart(): pid = ray.get(actor.get_pid.remote()) results = [actor.increase.remote() for _ in range(100)] # Kill actor process, while the above task is still being executed. - os.kill(pid, signal.SIGKILL) + os.kill(pid, SIGKILL) # Make sure that all tasks were executed in order before the actor's death. res = results.pop(0) i = 1 @@ -194,7 +196,7 @@ def test_actor_restart(): # kill actor process one more time. results = [actor.increase.remote() for _ in range(100)] pid = ray.get(actor.get_pid.remote()) - os.kill(pid, signal.SIGKILL) + os.kill(pid, SIGKILL) # The actor has exceeded max restarts, and this task should fail. with pytest.raises(ray.exceptions.RayActorError): ray.get(actor.increase.remote()) @@ -235,7 +237,7 @@ def test_actor_restart_with_retry(): pid = ray.get(actor.get_pid.remote()) results = [actor.increase.remote() for _ in range(100)] # Kill actor process, while the above task is still being executed. - os.kill(pid, signal.SIGKILL) + os.kill(pid, SIGKILL) # Check that none of the tasks failed and the actor is restarted. seq = list(range(1, 101)) results = ray.get(results) @@ -255,7 +257,7 @@ def test_actor_restart_with_retry(): # kill actor process one more time. results = [actor.increase.remote() for _ in range(100)] pid = ray.get(actor.get_pid.remote()) - os.kill(pid, signal.SIGKILL) + os.kill(pid, SIGKILL) # The actor has exceeded max restarts, and this task should fail. with pytest.raises(ray.exceptions.RayActorError): ray.get(actor.increase.remote()) @@ -351,7 +353,7 @@ def test_actor_restart_without_task(ray_start_regular): pid = ray.get(actor.get_pid.remote()) p = probe.remote() - os.kill(pid, signal.SIGKILL) + os.kill(pid, SIGKILL) ray.get(p) assert wait_for_condition(lambda: not actor_resource_available()) @@ -507,7 +509,7 @@ def test_multiple_actor_restart(ray_start_cluster_head): def kill_actor(actor): """A helper function that kills an actor process.""" pid = ray.get(actor.get_pid.remote()) - os.kill(pid, signal.SIGKILL) + os.kill(pid, SIGKILL) wait_for_pid_to_exit(pid) diff --git a/python/ray/tests/test_advanced.py b/python/ray/tests/test_advanced.py index 65918a239..c35a91762 100644 --- a/python/ray/tests/test_advanced.py +++ b/python/ray/tests/test_advanced.py @@ -334,19 +334,19 @@ def test_identical_function_names(ray_start_regular): return 1 @ray.remote # noqa: F811 - def g(): + def g(): # noqa: F811 return 2 @ray.remote # noqa: F811 - def g(): + def g(): # noqa: F811 return 3 @ray.remote # noqa: F811 - def g(): + def g(): # noqa: F811 return 4 @ray.remote # noqa: F811 - def g(): + def g(): # noqa: F811 return 5 result_values = ray.get([g.remote() for _ in range(num_calls)]) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index ba7fd39ab..ad34377de 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -1,3 +1,4 @@ +import os import shutil import tempfile import threading @@ -165,7 +166,7 @@ SMALL_CLUSTER = { }, "auth": { "ssh_user": "ubuntu", - "ssh_private_key": "/dev/null", + "ssh_private_key": os.devnull, }, "head_node": { "TestProp": 1, @@ -310,7 +311,7 @@ class AutoscalingTest(unittest.TestCase): return path def testInvalidConfig(self): - invalid_config = "/dev/null" + invalid_config = os.devnull with pytest.raises(ValueError): StandardAutoscaler( invalid_config, LoadMetrics(), update_interval_s=0) diff --git a/python/ray/tests/test_component_failures.py b/python/ray/tests/test_component_failures.py index 90c921f2f..256ad072a 100644 --- a/python/ray/tests/test_component_failures.py +++ b/python/ray/tests/test_component_failures.py @@ -1,5 +1,5 @@ import os -from signal import SIGKILL +import signal import sys import time @@ -8,6 +8,8 @@ import pytest import ray from ray.test_utils import run_string_as_driver_nonblocking, SignalActor +SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM + # This test checks that when a worker dies in the middle of a get, the plasma # store and raylet will not die. diff --git a/python/ray/tests/test_component_failures_2.py b/python/ray/tests/test_component_failures_2.py index 1a9821193..cecabc73c 100644 --- a/python/ray/tests/test_component_failures_2.py +++ b/python/ray/tests/test_component_failures_2.py @@ -11,6 +11,8 @@ import ray.ray_constants as ray_constants from ray.cluster_utils import Cluster from ray.test_utils import RayTestTimeoutException +SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM + @pytest.fixture(params=[(1, 4), (4, 4)]) def ray_start_workers_separate_multinode(request): @@ -62,7 +64,7 @@ def test_worker_failed(ray_start_workers_separate_multinode): time.sleep(0.1) # Kill the workers as the tasks execute. for pid in pids: - os.kill(pid, signal.SIGKILL) + os.kill(pid, SIGKILL) time.sleep(0.1) # Make sure that we either get the object or we get an appropriate # exception. diff --git a/python/ray/tests/test_multinode_failures.py b/python/ray/tests/test_multinode_failures.py index ba54006be..44dd0b407 100644 --- a/python/ray/tests/test_multinode_failures.py +++ b/python/ray/tests/test_multinode_failures.py @@ -11,6 +11,8 @@ import ray.ray_constants as ray_constants from ray.cluster_utils import Cluster from ray.test_utils import RayTestTimeoutException +SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM + @pytest.fixture(params=[(1, 4), (4, 4)]) def ray_start_workers_separate_multinode(request): @@ -62,7 +64,7 @@ def test_worker_failed(ray_start_workers_separate_multinode): time.sleep(0.1) # Kill the workers as the tasks execute. for pid in pids: - os.kill(pid, signal.SIGKILL) + os.kill(pid, SIGKILL) time.sleep(0.1) # Make sure that we either get the object or we get an appropriate # exception. diff --git a/python/ray/tests/test_reference_counting_2.py b/python/ray/tests/test_reference_counting_2.py index dd25b4fc9..12551f012 100644 --- a/python/ray/tests/test_reference_counting_2.py +++ b/python/ray/tests/test_reference_counting_2.py @@ -3,6 +3,7 @@ import json import logging import os import signal +import sys import numpy as np @@ -12,6 +13,8 @@ import ray import ray.cluster_utils from ray.test_utils import SignalActor, put_object, wait_for_condition +SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM + logger = logging.getLogger(__name__) @@ -269,7 +272,7 @@ def test_recursively_return_borrowed_object_id(one_worker_100MiB, use_ray_put, _fill_object_store_and_get(final_oid_bytes) if failure: - os.kill(owner_pid, signal.SIGKILL) + os.kill(owner_pid, SIGKILL) else: # Remove all references. del head_oid diff --git a/python/ray/worker.py b/python/ray/worker.py index c2d162a52..17b8adb1c 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -9,7 +9,6 @@ import json import logging import os import redis -import signal from six.moves import queue import sys import threading @@ -900,7 +899,7 @@ atexit.register(shutdown, True) # TODO(edoakes): this should only be set in the driver. def sigterm_handler(signum, frame): - sys.exit(signal.SIGTERM) + sys.exit(signum) try: @@ -1366,7 +1365,12 @@ def disconnect(exiting_interpreter=False): worker.node = None # Disconnect the worker from the node. worker.cached_functions_to_run = [] worker.serialization_context_map.clear() - ray.actor.ActorClassMethodMetadata.reset_cache() + try: + ray_actor = ray.actor + except AttributeError: + ray_actor = None # This can occur during program termination + if ray_actor is not None: + ray_actor.ActorClassMethodMetadata.reset_cache() @contextmanager diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index d4e9cf68e..ace882258 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -591,7 +591,7 @@ class Trainer(Trainable): if tf and not tf.executing_eagerly(): return tf.Graph().as_default() else: - return open("/dev/null") # fake a no-op scope + return open(os.devnull) # fake a no-op scope with get_scope(): self._init(self.config, self.env_creator) diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index c88a52fbb..a742d8b7d 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -131,9 +131,7 @@ WorkerPool::~WorkerPool() { } for (Process proc : procs_to_kill) { proc.Kill(); - } - for (Process proc : procs_to_kill) { - proc.Wait(); + // NOTE: Avoid calling Wait() here. It fails with ECHILD, as SIGCHLD is disabled. } } diff --git a/thirdparty/patches/glog-log-pid-tid.patch b/thirdparty/patches/glog-log-pid-tid.patch new file mode 100644 index 000000000..da6926702 --- /dev/null +++ b/thirdparty/patches/glog-log-pid-tid.patch @@ -0,0 +1,11 @@ +diff --git src/logging.cc src/logging.cc +--- src/logging.cc ++++ src/logging.cc +@@ -1437,3 +1437,6 @@ ++ << setfill(' ') << setw(5) ++ << static_cast(getpid()) << setfill('0') ++ << ' ' + << setfill(' ') << setw(5) + << static_cast(GetTID()) << setfill('0') + << ' ' +--