Use XRay backend by default. (#3020)

* Use XRay backend by default.

* Remove irrelevant valgrind tests.

* Fix

* Move tests around.

* Fix

* Fix test

* Fix test.

* String/unicode fix.

* Fix test

* Fix unicode issue.

* Minor changes

* Fix bug in test_global_state.py.

* Fix test.

* Linting

* Try arrow change and other object manager changes.

* Use newer plasma client API

* Small updates

* Revert plasma client api change.

* Update

* Update arrow and allow SendObjectHeaders to fail.

* Update arrow

* Update python/ray/experimental/state.py

Co-Authored-By: robertnishihara <robertnishihara@gmail.com>

* Address comments.
This commit is contained in:
Robert Nishihara
2018-10-23 12:46:39 -07:00
committed by Philipp Moritz
parent 9d2e864caf
commit 9c1826ed69
24 changed files with 264 additions and 228 deletions
+11 -9
View File
@@ -132,13 +132,15 @@ class GlobalState(object):
use_raylet = self.redis_client.get("UseRaylet")
if use_raylet is not None:
self.use_raylet = int(use_raylet) == 1
elif os.environ.get("RAY_USE_XRAY") == "1":
self.use_raylet = bool(int(use_raylet))
elif os.environ.get("RAY_USE_XRAY") == "0":
# This environment variable is used in our testing setup.
print("Detected environment variable 'RAY_USE_XRAY'.")
self.use_raylet = True
else:
print("Detected environment variable 'RAY_USE_XRAY' with value "
"{}. This turns OFF xray.".format(
os.environ.get("RAY_USE_XRAY")))
self.use_raylet = False
else:
self.use_raylet = True
# Get the rest of the information.
self.redis_clients = []
@@ -1310,8 +1312,10 @@ class GlobalState(object):
else:
clients = self.client_table()
for client in clients:
for key, value in client["Resources"].items():
resources[key] += value
# Only count resources from live clients.
if client["IsInsertion"]:
for key, value in client["Resources"].items():
resources[key] += value
return dict(resources)
@@ -1379,8 +1383,6 @@ class GlobalState(object):
if local_scheduler_id not in local_scheduler_ids:
del available_resources_by_id[local_scheduler_id]
else:
# TODO(rliaw): Is this a fair assumption?
# Assumes the number of Redis clients does not change
subscribe_clients = [
redis_client.pubsub(ignore_subscribe_messages=True)
for redis_client in self.redis_clients
+9
View File
@@ -5,6 +5,7 @@ from __future__ import print_function
import hashlib
import inspect
import json
import sys
import time
import traceback
from collections import (
@@ -342,6 +343,14 @@ class FunctionActorManager(object):
checkpoint_interval = int(checkpoint_interval)
actor_method_names = json.loads(decode(actor_method_names))
# In Python 2, json loads strings as unicode, so convert them back to
# strings.
if sys.version_info < (3, 0):
actor_method_names = [
method_name.encode("ascii")
for method_name in actor_method_names
]
# Create a temporary actor with some temporary methods so that if
# the actor fails to be unpickled, the temporary actor can be used
# (just to produce error messages and to prevent the driver from
+1 -1
View File
@@ -55,7 +55,7 @@ class TestGlobalScheduler(unittest.TestCase):
# Start one Redis server and N pairs of (plasma, local_scheduler)
self.node_ip_address = "127.0.0.1"
redis_address, redis_shards = services.start_redis(
self.node_ip_address)
self.node_ip_address, use_raylet=False)
redis_port = services.get_port(redis_address)
time.sleep(0.1)
# Create a client for the global state store.
+3 -2
View File
@@ -125,7 +125,7 @@ class TestPlasmaManager(unittest.TestCase):
store_name1, self.p2 = start_plasma_store(use_valgrind=USE_VALGRIND)
store_name2, self.p3 = start_plasma_store(use_valgrind=USE_VALGRIND)
# Start a Redis server.
redis_address, _ = services.start_redis("127.0.0.1")
redis_address, _ = services.start_redis("127.0.0.1", use_raylet=False)
# Start two PlasmaManagers.
manager_name1, self.p4, self.port1 = ray.plasma.start_plasma_manager(
store_name1, redis_address, use_valgrind=USE_VALGRIND)
@@ -483,7 +483,8 @@ class TestPlasmaManagerRecovery(unittest.TestCase):
self.store_name, self.p2 = start_plasma_store(
use_valgrind=USE_VALGRIND)
# Start a Redis server.
self.redis_address, _ = services.start_redis("127.0.0.1")
self.redis_address, _ = services.start_redis(
"127.0.0.1", use_raylet=False)
# Start a PlasmaManagers.
manager_name, self.p3, self.port1 = ray.plasma.start_plasma_manager(
self.store_name, self.redis_address, use_valgrind=USE_VALGRIND)
+5 -3
View File
@@ -230,8 +230,9 @@ class RayLogSpanRaylet(object):
value: The attribute value.
"""
if not isinstance(key, str) or not isinstance(value, str):
raise ValueError("The extra_data argument must be a "
"dictionary mapping strings to strings.")
raise ValueError("The arguments 'key' and 'value' must both be "
"strings. Instead they are {} and {}.".format(
key, value))
self.extra_data[key] = value
def __enter__(self):
@@ -250,7 +251,8 @@ class RayLogSpanRaylet(object):
for key, value in self.extra_data.items():
if not isinstance(key, str) or not isinstance(value, str):
raise ValueError("The extra_data argument must be a "
"dictionary mapping strings to strings.")
"dictionary mapping strings to strings. "
"Instead it is {}.".format(self.extra_data))
if type is not None:
extra_data = json.dumps({
+12 -6
View File
@@ -169,9 +169,9 @@ def cli(logging_level, logging_format):
help="the file that contains the autoscaling config")
@click.option(
"--use-raylet",
is_flag=True,
default=None,
help="use the raylet code path")
type=bool,
help="use the raylet code path, this defaults to false")
@click.option(
"--no-redirect-worker-output",
is_flag=True,
@@ -207,10 +207,16 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
if redis_address is not None:
redis_address = services.address_to_ip(redis_address)
if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1":
# This environment variable is used in our testing setup.
logger.info("Detected environment variable 'RAY_USE_XRAY'.")
use_raylet = True
if use_raylet is None:
if os.environ.get("RAY_USE_XRAY") == "0":
# This environment variable is used in our testing setup.
logger.info("Detected environment variable 'RAY_USE_XRAY' with "
"value {}. This turns OFF xray.".format(
os.environ.get("RAY_USE_XRAY")))
use_raylet = False
else:
use_raylet = True
if not use_raylet and redis_password is not None:
raise Exception("Setting the 'redis-password' argument is not "
"supported in legacy Ray. To run Ray with "
+10 -15
View File
@@ -430,7 +430,7 @@ def start_redis(node_ip_address,
redis_shard_ports=None,
num_redis_shards=1,
redis_max_clients=None,
use_raylet=False,
use_raylet=True,
redirect_output=False,
redirect_worker_output=False,
cleanup=True,
@@ -450,8 +450,7 @@ def start_redis(node_ip_address,
shard.
redis_max_clients: If this is provided, Ray will attempt to configure
Redis with this maxclients number.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
use_raylet: True if the new raylet code path should be used.
redirect_output (bool): True if output should be redirected to a file
and false otherwise.
redirect_worker_output (bool): True if worker output should be
@@ -1100,7 +1099,7 @@ def start_plasma_store(node_ip_address,
cleanup=True,
plasma_directory=None,
huge_pages=False,
use_raylet=False,
use_raylet=True,
plasma_store_socket_name=None,
redis_password=None):
"""This method starts an object store process.
@@ -1130,8 +1129,7 @@ def start_plasma_store(node_ip_address,
be created.
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
use_raylet: True if the new raylet code path should be used.
redis_password (str): The password of the redis server.
Return:
@@ -1359,7 +1357,7 @@ def start_ray_processes(address_info=None,
plasma_directory=None,
huge_pages=False,
autoscaling_config=None,
use_raylet=False,
use_raylet=True,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None):
@@ -1417,8 +1415,7 @@ def start_ray_processes(address_info=None,
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
autoscaling_config: path to autoscaling config file.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
use_raylet: True if the new raylet code path should be used.
plasma_store_socket_name (str): If provided, it will specify the socket
name used by the plasma store.
raylet_socket_name (str): If provided, it will specify the socket path
@@ -1692,7 +1689,7 @@ def start_ray_node(node_ip_address,
resources=None,
plasma_directory=None,
huge_pages=False,
use_raylet=False,
use_raylet=True,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None):
@@ -1730,8 +1727,7 @@ def start_ray_node(node_ip_address,
be created.
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
use_raylet: True if the new raylet code path should be used.
plasma_store_socket_name (str): If provided, it will specify the socket
name used by the plasma store.
raylet_socket_name (str): If provided, it will specify the socket path
@@ -1788,7 +1784,7 @@ def start_ray_head(address_info=None,
plasma_directory=None,
huge_pages=False,
autoscaling_config=None,
use_raylet=False,
use_raylet=True,
plasma_store_socket_name=None,
raylet_socket_name=None,
temp_dir=None):
@@ -1840,8 +1836,7 @@ def start_ray_head(address_info=None,
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
autoscaling_config: path to autoscaling config file.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
use_raylet: True if the new raylet code path should be used.
plasma_store_socket_name (str): If provided, it will specify the socket
name used by the plasma store.
raylet_socket_name (str): If provided, it will specify the socket path
+36 -36
View File
@@ -2,57 +2,57 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import time
import ray
def setup_module():
if not ray.worker.global_worker.connected:
ray.init(num_cpus=1)
# Finish initializing Ray. Otherwise available_resources() does not
# reflect resource use of submitted tasks
ray.get(cpu_task.remote(0))
@pytest.fixture
def ray_start():
# Start the Ray processes.
ray.init(num_cpus=1)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@ray.remote(num_cpus=1)
def cpu_task(seconds):
time.sleep(seconds)
def test_replenish_resources(ray_start):
cluster_resources = ray.global_state.cluster_resources()
available_resources = ray.global_state.available_resources()
assert cluster_resources == available_resources
@ray.remote
def cpu_task():
pass
ray.get(cpu_task.remote())
start = time.time()
resources_reset = False
class TestAvailableResources(object):
timeout = 10
def test_no_tasks(self):
cluster_resources = ray.global_state.cluster_resources()
while not resources_reset and time.time() - start < timeout:
available_resources = ray.global_state.available_resources()
assert cluster_resources == available_resources
resources_reset = (cluster_resources == available_resources)
def test_replenish_resources(self):
cluster_resources = ray.global_state.cluster_resources()
assert resources_reset
ray.get(cpu_task.remote(0))
start = time.time()
resources_reset = False
while not resources_reset and time.time() - start < self.timeout:
available_resources = ray.global_state.available_resources()
resources_reset = (cluster_resources == available_resources)
def test_uses_resources(ray_start):
cluster_resources = ray.global_state.cluster_resources()
assert resources_reset
@ray.remote
def cpu_task():
time.sleep(1)
def test_uses_resources(self):
cluster_resources = ray.global_state.cluster_resources()
task_id = cpu_task.remote(1)
start = time.time()
resource_used = False
cpu_task.remote()
resource_used = False
while not resource_used and time.time() - start < self.timeout:
available_resources = ray.global_state.available_resources()
resource_used = available_resources[
"CPU"] == cluster_resources["CPU"] - 1
start = time.time()
timeout = 10
while not resource_used and time.time() - start < timeout:
available_resources = ray.global_state.available_resources()
resource_used = available_resources[
"CPU"] == cluster_resources["CPU"] - 1
assert resource_used
ray.get(task_id) # clean up to reset resources
assert resource_used
+1 -9
View File
@@ -25,19 +25,11 @@ def shutdown_only():
class TestRedisPassword(object):
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") != "on"
and os.environ.get("RAY_USE_XRAY"),
reason="Redis authentication works for raylet and old GCS.")
def test_exceptions(self, password, shutdown_only):
with pytest.raises(Exception):
ray.init(redis_password=password)
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="New GCS API doesn't support Redis authentication yet.")
@pytest.mark.skipif(
not os.environ.get("RAY_USE_XRAY"),
os.environ.get("RAY_USE_XRAY") == "0",
reason="Redis authentication is not supported in legacy Ray.")
def test_redis_password(self, password, shutdown_only):
# Workaround for https://github.com/ray-project/ray/issues/3045
+22 -11
View File
@@ -1223,7 +1223,7 @@ def _initialize_serialization(driver_id, worker=global_worker):
def get_address_info_from_redis_helper(redis_address,
node_ip_address,
use_raylet=False,
use_raylet=True,
redis_password=None):
redis_ip_address, redis_port = redis_address.split(":")
# For this command to work, some other client (on the same machine as
@@ -1333,7 +1333,7 @@ def get_address_info_from_redis_helper(redis_address,
def get_address_info_from_redis(redis_address,
node_ip_address,
num_retries=5,
use_raylet=False,
use_raylet=True,
redis_password=None):
counter = 0
while True:
@@ -1497,10 +1497,15 @@ def _init(address_info=None,
else:
driver_mode = SCRIPT_MODE
if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1":
# This environment variable is used in our testing setup.
logger.info("Detected environment variable 'RAY_USE_XRAY'.")
use_raylet = True
if use_raylet is None:
if os.environ.get("RAY_USE_XRAY") == "0":
# This environment variable is used in our testing setup.
logger.info("Detected environment variable 'RAY_USE_XRAY' with "
"value {}. This turns OFF xray.".format(
os.environ.get("RAY_USE_XRAY")))
use_raylet = False
else:
use_raylet = True
# Get addresses of existing services.
if address_info is None:
@@ -1762,10 +1767,16 @@ def init(redis_address=None,
else:
raise Exception("Perhaps you called ray.init twice by accident?")
if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1":
# This environment variable is used in our testing setup.
logger.info("Detected environment variable 'RAY_USE_XRAY'.")
use_raylet = True
if use_raylet is None:
if os.environ.get("RAY_USE_XRAY") == "0":
# This environment variable is used in our testing setup.
logger.info("Detected environment variable 'RAY_USE_XRAY' with "
"value {}. This turns OFF xray.".format(
os.environ.get("RAY_USE_XRAY")))
use_raylet = False
else:
use_raylet = True
if not use_raylet and redis_password is not None:
raise Exception("Setting the 'redis_password' argument is not "
"supported in legacy Ray. To run Ray with "
@@ -1993,7 +2004,7 @@ def connect(info,
object_id_seed=None,
mode=WORKER_MODE,
worker=global_worker,
use_raylet=False,
use_raylet=True,
redis_password=None):
"""Connect this worker to the local scheduler, to Plasma, and to Redis.