[xray] Integrate worker.py with raylet. (#1810)

* Integrate worker with raylet.

* Begin allowing worker to attach to cluster.

* Fix linting and documentation.

* Fix linting.

* Comment tests back in.

* Fix type of worker command.

* Remove xray python files and tests.

* Fix from rebase.

* Add test.

* Copy over raylet executable.

* Small cleanup.
This commit is contained in:
Robert Nishihara
2018-04-03 02:38:56 -07:00
committed by Philipp Moritz
parent 0fc989c6c1
commit fbfbb1c079
22 changed files with 459 additions and 506 deletions
+160 -78
View File
@@ -31,6 +31,9 @@ import ray.plasma
from ray.utils import (FunctionProperties, random_string, binary_to_hex,
is_cython)
# Import flatbuffer bindings.
from ray.core.generated.ClientTableData import ClientTableData
SCRIPT_MODE = 0
WORKER_MODE = 1
PYTHON_MODE = 2
@@ -50,6 +53,7 @@ NIL_LOCAL_SCHEDULER_ID = NIL_ID
NIL_FUNCTION_ID = NIL_ID
NIL_ACTOR_ID = NIL_ID
NIL_ACTOR_HANDLE_ID = NIL_ID
NIL_CLIENT_ID = 20 * b"\xff"
# This must be kept in sync with the `error_types` array in
# common/state/error_table.h.
@@ -452,9 +456,12 @@ class Worker(object):
for object_id in object_ids]
for i in range(0, len(object_ids),
ray._config.worker_fetch_request_size()):
self.plasma_client.fetch(
plain_object_ids[i:(i +
ray._config.worker_fetch_request_size())])
if not self.use_raylet:
self.plasma_client.fetch(
plain_object_ids
[i:(i + ray._config.worker_fetch_request_size())])
else:
print("plasma_client.fetch has not been implemented yet")
# Get the objects. We initially try to get the objects immediately.
final_results = self.retrieve_and_deserialize(plain_object_ids, 0)
@@ -478,9 +485,12 @@ class Worker(object):
plasma.ObjectID, unready_ids.keys()))
for i in range(0, len(object_ids_to_fetch),
ray._config.worker_fetch_request_size()):
self.plasma_client.fetch(
object_ids_to_fetch[i:(
i + ray._config.worker_fetch_request_size())])
if not self.use_raylet:
self.plasma_client.fetch(
object_ids_to_fetch[i:(
i + ray._config.worker_fetch_request_size())])
else:
print("plasma_client.fetch has not been implemented yet")
results = self.retrieve_and_deserialize(
object_ids_to_fetch,
max([ray._config.get_timeout_milliseconds(),
@@ -496,7 +506,7 @@ class Worker(object):
# If there were objects that we weren't able to get locally, let the
# local scheduler know that we're now unblocked.
if was_blocked:
if was_blocked and not self.use_raylet:
self.local_scheduler_client.notify_unblocked()
assert len(final_results) == len(object_ids)
@@ -1150,70 +1160,108 @@ def _initialize_serialization(worker=global_worker):
use_dict=True)
def get_address_info_from_redis_helper(redis_address, node_ip_address):
def get_address_info_from_redis_helper(redis_address, node_ip_address,
use_raylet=False):
redis_ip_address, redis_port = redis_address.split(":")
# For this command to work, some other client (on the same machine as
# Redis) must have run "CONFIG SET protected-mode no".
redis_client = redis.StrictRedis(host=redis_ip_address,
port=int(redis_port))
# The client table prefix must be kept in sync with the file
# "src/common/redis_module/ray_redis_module.cc" where it is defined.
REDIS_CLIENT_TABLE_PREFIX = "CL:"
client_keys = redis_client.keys("{}*".format(REDIS_CLIENT_TABLE_PREFIX))
# Filter to live clients on the same node and do some basic checking.
plasma_managers = []
local_schedulers = []
for key in client_keys:
info = redis_client.hgetall(key)
# Ignore clients that were deleted.
deleted = info[b"deleted"]
deleted = bool(int(deleted))
if deleted:
continue
if not use_raylet:
# The client table prefix must be kept in sync with the file
# "src/common/redis_module/ray_redis_module.cc" where it is defined.
REDIS_CLIENT_TABLE_PREFIX = "CL:"
client_keys = redis_client.keys(
"{}*".format(REDIS_CLIENT_TABLE_PREFIX))
# Filter to live clients on the same node and do some basic checking.
plasma_managers = []
local_schedulers = []
for key in client_keys:
info = redis_client.hgetall(key)
assert b"ray_client_id" in info
assert b"node_ip_address" in info
assert b"client_type" in info
client_node_ip_address = info[b"node_ip_address"].decode("ascii")
if (client_node_ip_address == node_ip_address or
(client_node_ip_address == "127.0.0.1" and
redis_ip_address == ray.services.get_node_ip_address())):
if info[b"client_type"].decode("ascii") == "plasma_manager":
plasma_managers.append(info)
elif info[b"client_type"].decode("ascii") == "local_scheduler":
local_schedulers.append(info)
# Make sure that we got at least one plasma manager and local scheduler.
assert len(plasma_managers) >= 1
assert len(local_schedulers) >= 1
# Build the address information.
object_store_addresses = []
for manager in plasma_managers:
address = manager[b"manager_address"].decode("ascii")
port = services.get_port(address)
object_store_addresses.append(
services.ObjectStoreAddress(
name=manager[b"store_socket_name"].decode("ascii"),
manager_name=manager[b"manager_socket_name"].decode("ascii"),
manager_port=port))
scheduler_names = [
scheduler[b"local_scheduler_socket_name"].decode("ascii")
for scheduler in local_schedulers]
client_info = {"node_ip_address": node_ip_address,
"redis_address": redis_address,
"object_store_addresses": object_store_addresses,
"local_scheduler_socket_names": scheduler_names,
# Web UI should be running.
"webui_url": _webui_url_helper(redis_client)}
return client_info
# Ignore clients that were deleted.
deleted = info[b"deleted"]
deleted = bool(int(deleted))
if deleted:
continue
assert b"ray_client_id" in info
assert b"node_ip_address" in info
assert b"client_type" in info
client_node_ip_address = info[b"node_ip_address"].decode("ascii")
if (client_node_ip_address == node_ip_address or
(client_node_ip_address == "127.0.0.1" and
redis_ip_address == ray.services.get_node_ip_address())):
if info[b"client_type"].decode("ascii") == "plasma_manager":
plasma_managers.append(info)
elif info[b"client_type"].decode("ascii") == "local_scheduler":
local_schedulers.append(info)
# Make sure that we got at least one plasma manager and local
# scheduler.
assert len(plasma_managers) >= 1
assert len(local_schedulers) >= 1
# Build the address information.
object_store_addresses = []
for manager in plasma_managers:
address = manager[b"manager_address"].decode("ascii")
port = services.get_port(address)
object_store_addresses.append(
services.ObjectStoreAddress(
name=manager[b"store_socket_name"].decode("ascii"),
manager_name=manager[b"manager_socket_name"].decode(
"ascii"),
manager_port=port))
scheduler_names = [
scheduler[b"local_scheduler_socket_name"].decode("ascii")
for scheduler in local_schedulers]
client_info = {"node_ip_address": node_ip_address,
"redis_address": redis_address,
"object_store_addresses": object_store_addresses,
"local_scheduler_socket_names": scheduler_names,
# Web UI should be running.
"webui_url": _webui_url_helper(redis_client)}
return client_info
# Handle the raylet case.
else:
# In the raylet code path, all client data is stored in a zset at the
# key for the nil client.
client_key = b"CLIENT:" + NIL_CLIENT_ID
clients = redis_client.zrange(client_key, 0, -1)
raylets = []
for client_message in clients:
client = ClientTableData.GetRootAsClientTableData(client_message,
0)
client_node_ip_address = client.NodeManagerAddress().decode(
"ascii")
if (client_node_ip_address == node_ip_address or
(client_node_ip_address == "127.0.0.1" and
redis_ip_address == ray.services.get_node_ip_address())):
raylets.append(client)
# TODO(rkn): The ObjectStoreSocketName field does not exist.
object_store_addresses = [
raylet.ObjectStoreSocketName().decode("ascii")
for raylet in raylets]
raylet_socket_names = [raylet.NodeManagerAddress().decode("ascii") for
raylet in raylets]
return {"node_ip_address": node_ip_address,
"redis_address": redis_address,
"object_store_addresses": object_store_addresses,
"raylet_socket_names": raylet_socket_names,
# Web UI should be running.
"webui_url": _webui_url_helper(redis_client)}
def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5):
def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5,
use_raylet=False):
counter = 0
while True:
try:
return get_address_info_from_redis_helper(redis_address,
node_ip_address)
node_ip_address,
use_raylet=use_raylet)
except Exception as e:
if counter == num_retries:
raise
@@ -1281,7 +1329,8 @@ def _init(address_info=None,
redis_max_clients=None,
plasma_directory=None,
huge_pages=False,
include_webui=True):
include_webui=True,
use_raylet=False):
"""Helper method to connect to an existing Ray cluster or start a new one.
This method handles two cases. Either a Ray cluster already exists and we
@@ -1336,6 +1385,8 @@ def _init(address_info=None,
Store with hugetlbfs support. Requires plasma_directory.
include_webui: Boolean flag indicating whether to start the web
UI, which is a Jupyter notebook.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
Returns:
Address information about the started processes.
@@ -1402,7 +1453,8 @@ def _init(address_info=None,
redis_max_clients=redis_max_clients,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
include_webui=include_webui)
include_webui=include_webui,
use_raylet=use_raylet)
else:
if redis_address is None:
raise Exception("When connecting to an existing cluster, "
@@ -1439,7 +1491,8 @@ def _init(address_info=None,
node_ip_address = services.get_node_ip_address(redis_address)
# Get the address info of the processes to connect to from Redis.
address_info = get_address_info_from_redis(redis_address,
node_ip_address)
node_ip_address,
use_raylet=use_raylet)
# Connect this driver to Redis, the object store, and the local scheduler.
# Choose the first object store and local scheduler if there are multiple.
@@ -1453,13 +1506,17 @@ def _init(address_info=None,
"redis_address": address_info["redis_address"],
"store_socket_name": (
address_info["object_store_addresses"][0].name),
"manager_socket_name": (
address_info["object_store_addresses"][0].manager_name),
"local_scheduler_socket_name": (
address_info["local_scheduler_socket_names"][0]),
"webui_url": address_info["webui_url"]}
if not use_raylet:
driver_address_info["manager_socket_name"] = (
address_info["object_store_addresses"][0].manager_name)
driver_address_info["local_scheduler_socket_name"] = (
address_info["local_scheduler_socket_names"][0])
else:
driver_address_info["raylet_socket_name"] = (
address_info["raylet_socket_name"])
connect(driver_address_info, object_id_seed=object_id_seed,
mode=driver_mode, worker=global_worker)
mode=driver_mode, worker=global_worker, use_raylet=use_raylet)
return address_info
@@ -1469,7 +1526,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
num_cpus=None, num_gpus=None, resources=None,
num_custom_resource=None, num_redis_shards=None,
redis_max_clients=None, plasma_directory=None,
huge_pages=False, include_webui=True, object_store_memory=None):
huge_pages=False, include_webui=True, object_store_memory=None,
use_raylet=False):
"""Connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we
@@ -1513,6 +1571,9 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
UI, which is a Jupyter notebook.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
Returns:
Address information about the started processes.
@@ -1539,7 +1600,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
include_webui=include_webui,
object_store_memory=object_store_memory)
object_store_memory=object_store_memory,
use_raylet=use_raylet)
def cleanup(worker=global_worker):
@@ -1818,7 +1880,8 @@ def import_thread(worker, mode):
pass
def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
use_raylet=False):
"""Connect this worker to the local scheduler, to Plasma, and to Redis.
Args:
@@ -1828,6 +1891,8 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
deterministic.
mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE,
PYTHON_MODE, and SILENT_MODE.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
"""
check_main_thread()
# Do some basic checking to make sure we didn't call ray.init twice.
@@ -1842,6 +1907,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
worker.actor_id = NIL_ACTOR_ID
worker.connected = True
worker.set_mode(mode)
worker.use_raylet = use_raylet
# The worker.events field is used to aggregate logging information and
# display it in the web UI. Note that Python lists protected by the GIL,
# which is important because we will append to this field from multiple
@@ -1909,8 +1975,9 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
"driver_id": worker.worker_id,
"start_time": time.time(),
"plasma_store_socket": info["store_socket_name"],
"plasma_manager_socket": info["manager_socket_name"],
"local_scheduler_socket": info["local_scheduler_socket_name"]}
"plasma_manager_socket": info.get("manager_socket_name"),
"local_scheduler_socket": info.get("local_scheduler_socket_name"),
"raylet_socket": info.get("raylet_socket_name")}
driver_info["name"] = (main.__file__ if hasattr(main, "__file__")
else "INTERACTIVE MODE")
worker.redis_client.hmset(b"Drivers:" + worker.worker_id, driver_info)
@@ -1933,11 +2000,22 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
raise Exception("This code should be unreachable.")
# Create an object store client.
worker.plasma_client = plasma.connect(info["store_socket_name"],
info["manager_socket_name"],
64)
if not worker.use_raylet:
worker.plasma_client = plasma.connect(info["store_socket_name"],
info["manager_socket_name"],
64)
else:
worker.plasma_client = plasma.connect(info["store_socket_name"],
"",
64)
if not worker.use_raylet:
local_scheduler_socket = info["local_scheduler_socket_name"]
else:
local_scheduler_socket = info["raylet_socket_name"]
worker.local_scheduler_client = ray.local_scheduler.LocalSchedulerClient(
info["local_scheduler_socket_name"], worker.worker_id, is_worker)
local_scheduler_socket, worker.worker_id, is_worker)
# If this is a driver, set the current task ID, the task driver ID, and set
# the task index to 0.
@@ -2275,9 +2353,10 @@ def flush_log(worker=global_worker):
"""Send the logged worker events to the global state store."""
event_log_key = b"event_log:" + worker.worker_id
event_log_value = json.dumps(worker.events)
worker.local_scheduler_client.log_event(event_log_key,
event_log_value,
time.time())
if not worker.use_raylet:
worker.local_scheduler_client.log_event(event_log_key,
event_log_value,
time.time())
worker.events = []
@@ -2367,6 +2446,9 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
A list of object IDs that are ready and a list of the remaining object
IDs.
"""
if worker.use_raylet:
print("plasma_client.wait has not been implemented yet")
return
if isinstance(object_ids, ray.local_scheduler.ObjectID):
raise TypeError(