diff --git a/doc/requirements-doc.txt b/doc/requirements-doc.txt index b92994d26..5d953d346 100644 --- a/doc/requirements-doc.txt +++ b/doc/requirements-doc.txt @@ -7,7 +7,6 @@ numpy opencv-python pyarrow pyyaml -psutil recommonmark redis sphinx diff --git a/doc/source/resources.rst b/doc/source/resources.rst index 05e44b8ea..e0dc9d742 100644 --- a/doc/source/resources.rst +++ b/doc/source/resources.rst @@ -32,8 +32,8 @@ through ``ray.init``, do the following. ray.init(num_cpus=8, num_gpus=1) If the number of CPUs is unspecified, Ray will automatically determine the -number by running ``psutil.cpu_count()``. If the number of GPUs is unspecified, -Ray will attempt to automatically detect the number of GPUs. +number by running ``multiprocessing.cpu_count()``. If the number of GPUs is +unspecified, Ray will attempt to automatically detect the number of GPUs. Specifying a task's CPU and GPU requirements ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/python/ray/local_scheduler/local_scheduler_services.py b/python/ray/local_scheduler/local_scheduler_services.py index 1f6b79a22..f7847ce55 100644 --- a/python/ray/local_scheduler/local_scheduler_services.py +++ b/python/ray/local_scheduler/local_scheduler_services.py @@ -2,8 +2,8 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import multiprocessing import os -import psutil import random import subprocess import sys @@ -108,7 +108,7 @@ def start_local_scheduler(plasma_store_name, for resource_name, resource_quantity in static_resources.items() ]) else: - resource_argument = "CPU,{}".format(psutil.cpu_count()) + resource_argument = "CPU,{}".format(multiprocessing.cpu_count()) command += ["-c", resource_argument] if use_valgrind: diff --git a/python/ray/services.py b/python/ray/services.py index 9d933a050..3a421437c 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -5,6 +5,7 @@ from __future__ import print_function import binascii import json import logging +import multiprocessing import os import random import resource @@ -18,7 +19,6 @@ import time from collections import OrderedDict, namedtuple from datetime import datetime -import psutil import redis import pyarrow @@ -872,7 +872,7 @@ def check_and_update_resources(resources, use_raylet): if "CPU" not in resources: # By default, use the number of hardware execution threads for the # number of cores. - resources["CPU"] = psutil.cpu_count() + resources["CPU"] = multiprocessing.cpu_count() # See if CUDA_VISIBLE_DEVICES has already been set. gpu_ids = ray.utils.get_cuda_visible_devices() @@ -1013,7 +1013,7 @@ def start_raylet(redis_address, # Limit the number of workers that can be started in parallel by the # raylet. However, make sure it is at least 1. maximum_startup_concurrency = max( - 1, min(psutil.cpu_count(), static_resources["CPU"])) + 1, min(multiprocessing.cpu_count(), static_resources["CPU"])) # Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'. resource_argument = ",".join([ @@ -1073,18 +1073,18 @@ def start_raylet(redis_address, return raylet_name -def start_objstore(node_ip_address, - redis_address, - object_manager_port=None, - store_stdout_file=None, - store_stderr_file=None, - manager_stdout_file=None, - manager_stderr_file=None, - objstore_memory=None, - cleanup=True, - plasma_directory=None, - huge_pages=False, - use_raylet=False): +def start_plasma_store(node_ip_address, + redis_address, + object_manager_port=None, + store_stdout_file=None, + store_stderr_file=None, + manager_stdout_file=None, + manager_stderr_file=None, + objstore_memory=None, + cleanup=True, + plasma_directory=None, + huge_pages=False, + use_raylet=False): """This method starts an object store process. Args: @@ -1121,7 +1121,7 @@ def start_objstore(node_ip_address, """ if objstore_memory is None: # Compute a fraction of the system memory for the Plasma store to use. - system_memory = psutil.virtual_memory().total + system_memory = ray.utils.get_system_memory() if sys.platform == "linux" or sys.platform == "linux2": # On linux we use /dev/shm, its size is half the size of the # physical memory. To not overflow it, we set the plasma memory @@ -1150,6 +1150,8 @@ def start_objstore(node_ip_address, else: objstore_memory = int(system_memory * 0.8) # Start the Plasma store. + logger.info("Starting the Plasma object store with {0:.2f} GB memory." + .format(objstore_memory // 10**9)) plasma_store_name, p1 = ray.plasma.start_plasma_store( plasma_store_memory=objstore_memory, use_profiler=RUN_PLASMA_STORE_PROFILER, @@ -1403,7 +1405,7 @@ def start_ray_processes(address_info=None, for resource_dict in resources: cpus = resource_dict.get("CPU") workers_per_local_scheduler.append(cpus if cpus is not None else - psutil.cpu_count()) + multiprocessing.cpu_count()) if address_info is None: address_info = {} @@ -1507,7 +1509,7 @@ def start_ray_processes(address_info=None, "plasma_store_{}".format(i), redirect_output) plasma_manager_stdout_file, plasma_manager_stderr_file = new_log_files( "plasma_manager_{}".format(i), redirect_output) - object_store_address = start_objstore( + object_store_address = start_plasma_store( node_ip_address, redis_address, object_manager_port=object_manager_ports[i], diff --git a/python/ray/utils.py b/python/ray/utils.py index 82a37b67e..0f6adaea9 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -7,6 +7,7 @@ import functools import hashlib import numpy as np import os +import subprocess import sys import threading import time @@ -266,6 +267,68 @@ def resources_from_resource_arguments(default_num_cpus, default_num_gpus, return resources +# This function is copied and modified from +# https://github.com/giampaolo/psutil/blob/5bd44f8afcecbfb0db479ce230c790fc2c56569a/psutil/tests/test_linux.py#L132-L138 # noqa: E501 +def vmstat(stat): + """Run vmstat and get a particular statistic. + + Args: + stat: The statistic that we are interested in retrieving. + + Returns: + The parsed output. + """ + out = subprocess.check_output(["vmstat", "-s"]) + stat = stat.encode("ascii") + for line in out.split(b"\n"): + line = line.strip() + if stat in line: + return int(line.split(b" ")[0]) + raise ValueError("Can't find {} in 'vmstat' output.".format(stat)) + + +# This function is copied and modified from +# https://github.com/giampaolo/psutil/blob/5e90b0a7f3fccb177445a186cc4fac62cfadb510/psutil/tests/test_osx.py#L29-L38 # noqa: E501 +def sysctl(command): + """Run a sysctl command and parse the output. + + Args: + command: A sysctl command with an argument, for example, + ["sysctl", "hw.memsize"]. + + Returns: + The parsed output. + """ + out = subprocess.check_output(command) + result = out.split(b" ")[1] + try: + return int(result) + except ValueError: + return result + + +def get_system_memory(): + """Return the total amount of system memory in bytes. + + Returns: + The total amount of system memory in bytes. + """ + # Use psutil if it is available. + try: + import psutil + return psutil.virtual_memory().total + except ImportError: + pass + + if sys.platform == "linux" or sys.platform == "linux2": + # Handle Linux. + bytes_in_kilobyte = 1024 + return vmstat("total memory") * bytes_in_kilobyte + else: + # Handle MacOS. + return sysctl(["sysctl", "hw.memsize"]) + + def check_oversized_pickle(pickled, name, obj_type, worker): """Send a warning message if the pickled object is too large. diff --git a/python/setup.py b/python/setup.py index e8f57918c..7f16620df 100644 --- a/python/setup.py +++ b/python/setup.py @@ -134,7 +134,6 @@ setup( "funcsigs", "click", "colorama", - "psutil", "pytest", "pyyaml", "redis", diff --git a/test/stress_tests.py b/test/stress_tests.py index 95b0d9192..d41cb58ad 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -169,7 +169,7 @@ def ray_start_reconstruction(request): manager_stdout_file, manager_stderr_file = (ray.services.new_log_files( "plasma_manager_{}".format(i), True)) plasma_addresses.append( - ray.services.start_objstore( + ray.services.start_plasma_store( node_ip_address, redis_address, objstore_memory=objstore_memory,