Remove dependence on psutil. Add utility functions for getting system memory. (#2892)

This commit is contained in:
Robert Nishihara
2018-09-18 00:03:29 -07:00
committed by Hao Chen
parent 61bf6c6123
commit ea9d1cc887
7 changed files with 88 additions and 25 deletions
@@ -2,8 +2,8 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import multiprocessing
import os
import psutil
import random
import subprocess
import sys
@@ -108,7 +108,7 @@ def start_local_scheduler(plasma_store_name,
for resource_name, resource_quantity in static_resources.items()
])
else:
resource_argument = "CPU,{}".format(psutil.cpu_count())
resource_argument = "CPU,{}".format(multiprocessing.cpu_count())
command += ["-c", resource_argument]
if use_valgrind:
+20 -18
View File
@@ -5,6 +5,7 @@ from __future__ import print_function
import binascii
import json
import logging
import multiprocessing
import os
import random
import resource
@@ -18,7 +19,6 @@ import time
from collections import OrderedDict, namedtuple
from datetime import datetime
import psutil
import redis
import pyarrow
@@ -872,7 +872,7 @@ def check_and_update_resources(resources, use_raylet):
if "CPU" not in resources:
# By default, use the number of hardware execution threads for the
# number of cores.
resources["CPU"] = psutil.cpu_count()
resources["CPU"] = multiprocessing.cpu_count()
# See if CUDA_VISIBLE_DEVICES has already been set.
gpu_ids = ray.utils.get_cuda_visible_devices()
@@ -1013,7 +1013,7 @@ def start_raylet(redis_address,
# Limit the number of workers that can be started in parallel by the
# raylet. However, make sure it is at least 1.
maximum_startup_concurrency = max(
1, min(psutil.cpu_count(), static_resources["CPU"]))
1, min(multiprocessing.cpu_count(), static_resources["CPU"]))
# Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'.
resource_argument = ",".join([
@@ -1073,18 +1073,18 @@ def start_raylet(redis_address,
return raylet_name
def start_objstore(node_ip_address,
redis_address,
object_manager_port=None,
store_stdout_file=None,
store_stderr_file=None,
manager_stdout_file=None,
manager_stderr_file=None,
objstore_memory=None,
cleanup=True,
plasma_directory=None,
huge_pages=False,
use_raylet=False):
def start_plasma_store(node_ip_address,
redis_address,
object_manager_port=None,
store_stdout_file=None,
store_stderr_file=None,
manager_stdout_file=None,
manager_stderr_file=None,
objstore_memory=None,
cleanup=True,
plasma_directory=None,
huge_pages=False,
use_raylet=False):
"""This method starts an object store process.
Args:
@@ -1121,7 +1121,7 @@ def start_objstore(node_ip_address,
"""
if objstore_memory is None:
# Compute a fraction of the system memory for the Plasma store to use.
system_memory = psutil.virtual_memory().total
system_memory = ray.utils.get_system_memory()
if sys.platform == "linux" or sys.platform == "linux2":
# On linux we use /dev/shm, its size is half the size of the
# physical memory. To not overflow it, we set the plasma memory
@@ -1150,6 +1150,8 @@ def start_objstore(node_ip_address,
else:
objstore_memory = int(system_memory * 0.8)
# Start the Plasma store.
logger.info("Starting the Plasma object store with {0:.2f} GB memory."
.format(objstore_memory // 10**9))
plasma_store_name, p1 = ray.plasma.start_plasma_store(
plasma_store_memory=objstore_memory,
use_profiler=RUN_PLASMA_STORE_PROFILER,
@@ -1403,7 +1405,7 @@ def start_ray_processes(address_info=None,
for resource_dict in resources:
cpus = resource_dict.get("CPU")
workers_per_local_scheduler.append(cpus if cpus is not None else
psutil.cpu_count())
multiprocessing.cpu_count())
if address_info is None:
address_info = {}
@@ -1507,7 +1509,7 @@ def start_ray_processes(address_info=None,
"plasma_store_{}".format(i), redirect_output)
plasma_manager_stdout_file, plasma_manager_stderr_file = new_log_files(
"plasma_manager_{}".format(i), redirect_output)
object_store_address = start_objstore(
object_store_address = start_plasma_store(
node_ip_address,
redis_address,
object_manager_port=object_manager_ports[i],
+63
View File
@@ -7,6 +7,7 @@ import functools
import hashlib
import numpy as np
import os
import subprocess
import sys
import threading
import time
@@ -266,6 +267,68 @@ def resources_from_resource_arguments(default_num_cpus, default_num_gpus,
return resources
# This function is copied and modified from
# https://github.com/giampaolo/psutil/blob/5bd44f8afcecbfb0db479ce230c790fc2c56569a/psutil/tests/test_linux.py#L132-L138 # noqa: E501
def vmstat(stat):
"""Run vmstat and get a particular statistic.
Args:
stat: The statistic that we are interested in retrieving.
Returns:
The parsed output.
"""
out = subprocess.check_output(["vmstat", "-s"])
stat = stat.encode("ascii")
for line in out.split(b"\n"):
line = line.strip()
if stat in line:
return int(line.split(b" ")[0])
raise ValueError("Can't find {} in 'vmstat' output.".format(stat))
# This function is copied and modified from
# https://github.com/giampaolo/psutil/blob/5e90b0a7f3fccb177445a186cc4fac62cfadb510/psutil/tests/test_osx.py#L29-L38 # noqa: E501
def sysctl(command):
"""Run a sysctl command and parse the output.
Args:
command: A sysctl command with an argument, for example,
["sysctl", "hw.memsize"].
Returns:
The parsed output.
"""
out = subprocess.check_output(command)
result = out.split(b" ")[1]
try:
return int(result)
except ValueError:
return result
def get_system_memory():
"""Return the total amount of system memory in bytes.
Returns:
The total amount of system memory in bytes.
"""
# Use psutil if it is available.
try:
import psutil
return psutil.virtual_memory().total
except ImportError:
pass
if sys.platform == "linux" or sys.platform == "linux2":
# Handle Linux.
bytes_in_kilobyte = 1024
return vmstat("total memory") * bytes_in_kilobyte
else:
# Handle MacOS.
return sysctl(["sysctl", "hw.memsize"])
def check_oversized_pickle(pickled, name, obj_type, worker):
"""Send a warning message if the pickled object is too large.
-1
View File
@@ -134,7 +134,6 @@ setup(
"funcsigs",
"click",
"colorama",
"psutil",
"pytest",
"pyyaml",
"redis",