Update worker.py and services.py to use plasma and the local scheduler. (#19)

* Update worker code and services code to use plasma and the local scheduler.

* Cleanups.

* Fix bug in which threads were started before the worker mode was set. This caused remote functions to be defined on workers before the worker knew it was in WORKER_MODE.

* Fix bug in install-dependencies.sh.

* Lengthen timeout in failure_test.py.

* Cleanups.

* Cleanup services.start_ray_local.

* Clean up random name generation.

* Cleanups.
This commit is contained in:
Robert Nishihara
2016-11-02 00:39:35 -07:00
committed by Philipp Moritz
parent 2068587af8
commit 072f442c1f
20 changed files with 625 additions and 1210 deletions
+2 -4
View File
@@ -11,8 +11,6 @@ if hasattr(ctypes, "windll"):
import config
import serialization
from worker import scheduler_info, register_class, visualize_computation_graph, task_info, init, connect, disconnect, get, put, wait, remote, kill_workers, restart_workers_local
from worker import register_class, error_info, init, connect, disconnect, get, put, wait, remote
from worker import Reusable, reusables
from libraylib import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE
from libraylib import ObjectID
import internal
from worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE
+25
View File
@@ -0,0 +1,25 @@
from __future__ import print_function
import sys
import argparse
import numpy as np
import ray
parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.")
parser.add_argument("--node-ip-address", required=True, type=str, help="the ip address of the worker's node")
parser.add_argument("--redis-port", required=True, type=int, help="the port to use for Redis")
parser.add_argument("--object-store-name", type=str, help="the object store's name")
parser.add_argument("--object-store-manager-name", type=str, help="the object store manager's name")
parser.add_argument("--local-scheduler-name", type=str, help="the local scheduler's name")
if __name__ == "__main__":
args = parser.parse_args()
address_info = {"node_ip_address": args.node_ip_address,
"redis_port": args.redis_port,
"object_store_name": args.object_store_name,
"object_store_manager_name": args.object_store_manager_name,
"local_scheduler_name": args.local_scheduler_name}
ray.worker.connect(address_info, ray.WORKER_MODE)
ray.worker.main_loop()
-34
View File
@@ -1,34 +0,0 @@
# Utilities to deal with computation graphs
import graphviz
def graph_to_graphviz(computation_graph):
"""
Convert the computation graph to graphviz format.
Args:
computation_graph [graph_pb2.CompGraph]: protocol buffer description of
the computation graph
Returns:
Graphviz description of the computation graph
"""
dot = graphviz.Digraph(format="pdf")
dot.node("op-root", shape="box")
for (i, op) in enumerate(computation_graph.operation):
if op.HasField("task"):
dot.node("op" + str(i), shape="box", label=str(i) + "\n" + op.task.name.split(".")[-1])
for res in op.task.result:
dot.edge("op" + str(i), str(res))
elif op.HasField("put"):
dot.node("op" + str(i), shape="box", label=str(i) + "\n" + "put")
dot.edge("op" + str(i), str(op.put.objectid))
elif op.HasField("get"):
dot.node("op" + str(i), shape="box", label=str(i) + "\n" + "get")
creator_operationid = op.creator_operationid if op.creator_operationid != 2 ** 64 - 1 else "-root"
dot.edge("op" + str(creator_operationid), "op" + str(i), style="dotted", constraint="false")
for arg in op.task.arg:
if len(arg.serialized_arg) == 0:
dot.node(str(arg.objectid))
dot.edge(str(arg.objectid), "op" + str(i))
return dot
View File
+2 -82
View File
@@ -1,89 +1,9 @@
from __future__ import print_function
import numpy as np
import pickling
import libraylib as raylib
import numbuf
def is_argument_serializable(value):
"""Checks if value is a composition of primitive types.
This will return True if the argument is one of the following:
- An int
- A float
- A bool
- None
- A list of length at most 100 whose elements are serializable
- A tuple of length at most 100 whose elements are serializable
- A dict of length at most 100 whose keys and values are serializable
- A string of length at most 100.
- A unicode string of length at most 100.
Args:
value: A Python object.
Returns:
True if the object can be serialized as a composition of primitive types and
False otherwise.
"""
t = type(value)
if t is int or t is float or t is long or t is bool or value is None:
return True
if t is list:
if len(value) <= 100:
for element in value:
if not is_argument_serializable(element):
return False
return True
else:
return False
if t is tuple:
if len(value) <= 100:
for element in value:
if not is_argument_serializable(element):
return False
return True
else:
return False
if t is dict:
if len(value) <= 100:
for k, v in value.iteritems():
if not is_argument_serializable(k) or not is_argument_serializable(v):
return False
return True
else:
return False
if t is str:
return len(value) <= 100
if t is unicode:
return len(value) <= 100
return False
def serialize_argument_if_possible(value):
"""This method serializes arguments that are passed by value.
The result will be deserialized by deserialize_argument.
Returns:
None if value cannot be efficiently serialized or is too big, and otherwise
this returns the serialized value as a string.
"""
if not is_argument_serializable(value):
# The argument is not obviously serializable using __repr__, so we will not
# serialize it.
return None
serialized_value = value.__repr__()
if len(serialized_value) > 1000:
# The argument is too big, so we will not pass it by value.
return None
# Return the serialized argument.
return serialized_value
def deserialize_argument(serialized_value):
"""This method deserializes arguments that are passed by value.
The argument will have been serialized by serialize_argument.
"""
return eval(serialized_value)
def check_serializable(cls):
"""Throws an exception if Ray cannot serialize this class efficiently.
+80 -115
View File
@@ -1,31 +1,29 @@
from __future__ import print_function
import os
import sys
import time
import subprocess32 as subprocess
import subprocess
import string
import random
# Ray modules
import config
_services_env = os.environ.copy()
_services_env["PATH"] = os.pathsep.join([os.path.dirname(os.path.abspath(__file__)), _services_env["PATH"]])
# Make GRPC only print error messages.
_services_env["GRPC_VERBOSITY"] = "ERROR"
# all_processes is a list of the scheduler, object store, and worker processes
# that have been started by this services module if Ray is being used in local
# mode.
all_processes = []
TIMEOUT_SECONDS = 5
def address(host, port):
return host + ":" + str(port)
def new_scheduler_port():
def new_port():
return random.randint(10000, 65535)
def random_name():
return str(random.randint(0, 99999999))
def cleanup():
"""When running in local mode, shutdown the Ray processes.
@@ -36,7 +34,8 @@ def cleanup():
"""
global all_processes
successfully_shut_down = True
for p in all_processes:
# Terminate the processes in reverse order.
for p in all_processes[::-1]:
if p.poll() is not None: # process has already terminated
continue
p.kill()
@@ -49,146 +48,112 @@ def cleanup():
continue
successfully_shut_down = False
if successfully_shut_down:
print "Successfully shut down Ray."
print("Successfully shut down Ray.")
else:
print "Ray did not shut down properly."
print("Ray did not shut down properly.")
all_processes = []
def start_scheduler(scheduler_address, cleanup):
"""This method starts a scheduler process.
Args:
scheduler_address (str): The ip address and port to use for the scheduler.
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
this process will be killed by serices.cleanup() when the Python process
that imported services exits.
"""
scheduler_port = scheduler_address.split(":")[1]
p = subprocess.Popen(["scheduler", scheduler_address, "--log-file-name", config.get_log_file_path("scheduler-" + scheduler_port + ".log")], env=_services_env)
def start_redis(port):
redis_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../common/thirdparty/redis-3.2.3/src/redis-server")
p = subprocess.Popen([redis_filepath, "--port", str(port), "--loglevel", "warning"])
if cleanup:
all_processes.append(p)
def start_objstore(scheduler_address, node_ip_address, cleanup):
def start_local_scheduler(redis_address, plasma_store_name):
local_scheduler_filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../photon/build/photon_scheduler")
local_scheduler_name = "/tmp/scheduler{}".format(random_name())
p = subprocess.Popen([local_scheduler_filepath, "-s", local_scheduler_name, "-r", redis_address, "-p", plasma_store_name])
if cleanup:
all_processes.append(p)
return local_scheduler_name
def start_objstore(node_ip_address, redis_address, cleanup):
"""This method starts an object store process.
Args:
scheduler_address (str): The ip address and port of the scheduler to connect
to.
node_ip_address (str): The ip address of the node running the object store.
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
this process will be killed by serices.cleanup() when the Python process
that imported services exits.
"""
random_string = "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))
p = subprocess.Popen(["objstore", scheduler_address, node_ip_address, "--log-file-name", config.get_log_file_path("-".join(["objstore", random_string]) + ".log")], env=_services_env)
if cleanup:
all_processes.append(p)
plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_store")
store_name = "/tmp/ray_plasma_store{}".format(random_name())
p1 = subprocess.Popen([plasma_store_executable, "-s", store_name])
def start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=None, cleanup=True):
plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../plasma/build/plasma_manager")
manager_name = "/tmp/ray_plasma_manager{}".format(random_name())
manager_port = new_port()
p2 = subprocess.Popen([plasma_manager_executable,
"-s", store_name,
"-m", manager_name,
"-h", node_ip_address,
"-p", str(manager_port),
"-r", redis_address])
if cleanup:
all_processes.append(p1)
all_processes.append(p2)
return store_name, manager_name, manager_port
def start_worker(address_info, worker_path, cleanup=True):
"""This method starts a worker process.
Args:
node_ip_address (str): The IP address of the node that the worker runs on.
address_info (dict): This dictionary contains the node_ip_address,
redis_port, object_store_name, object_store_manager_name, and
local_scheduler_name.
worker_path (str): The path of the source code which the worker process will
run.
scheduler_address (str): The ip address and port of the scheduler to connect
to.
objstore_address (Optional[str]): The ip address and port of the object
store to connect to.
cleanup (Optional[bool]): True if using Ray in local mode. If cleanup is
true, then this process will be killed by serices.cleanup() when the
Python process that imported services exits. This is True by default.
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
this process will be killed by services.cleanup() when the Python process
that imported services exits. This is True by default.
"""
command = ["python",
worker_path,
"--node-ip-address=" + node_ip_address,
"--scheduler-address=" + scheduler_address]
if objstore_address is not None:
command.append("--objstore-address=" + objstore_address)
"--node-ip-address=" + address_info["node_ip_address"],
"--object-store-name=" + address_info["object_store_name"],
"--object-store-manager-name=" + address_info["object_store_manager_name"],
"--local-scheduler-name=" + address_info["local_scheduler_name"],
"--redis-port=" + str(address_info["redis_port"])]
p = subprocess.Popen(command)
if cleanup:
all_processes.append(p)
def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None, cleanup=False):
"""Start an object store and associated workers in the cluster setting.
This starts an object store and the associated workers when Ray is being used
in the cluster setting. This assumes the scheduler has already been started.
Args:
scheduler_address (str): IP address and port of the scheduler (which may run
on a different node).
node_ip_address (str): IP address (without port) of the node this function
is run on.
num_workers (int): The number of workers to be started on this node.
worker_path (str): Path of the Python worker script that will be run on the
worker.
cleanup (bool): If cleanup is True, then the processes started by this
command will be killed when the process that imported services exits.
"""
start_objstore(scheduler_address, node_ip_address, cleanup=cleanup)
time.sleep(0.2)
if worker_path is None:
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py")
for _ in range(num_workers):
start_worker(node_ip_address, worker_path, scheduler_address, cleanup=cleanup)
time.sleep(0.5)
def start_workers(scheduler_address, objstore_address, num_workers, worker_path):
"""Start a new set of workers on this node.
Start a new set of workers on this node. This assumes that the scheduler is
already running and that the object store on this node is already running. The
intended use case is that a developer wants to update the code running on the
worker processes so first kills all of the workers and then runs this method.
Args:
scheduler_address (str): ip address and port of the scheduler (which may run
on a different node)
objstore_address (str): ip address and port of the object store (which runs
on the same node)
num_workers (int): the number of workers to be started on this node
worker_path (str): path of the source code that will be run on the worker
"""
node_ip_address = objstore_address.split(":")[0]
for _ in range(num_workers):
start_worker(node_ip_address, worker_path, scheduler_address, cleanup=False)
def start_ray_local(node_ip_address="127.0.0.1", num_objstores=1, num_workers=0, worker_path=None):
def start_ray_local(node_ip_address="127.0.0.1", num_workers=0, worker_path=None):
"""Start Ray in local mode.
This method starts Ray in local mode (as opposed to cluster mode, which is
handled by cluster.py).
Args:
num_objstores (int): The number of object stores to start. Aside from
testing, this should be one.
num_workers (int): The number of workers to start.
worker_path (str): The path of the source code that will be run by the
worker.
Returns:
The address of the scheduler and the addresses of all of the object stores.
This returns a tuple of three things. The first element is a tuple of the
Redis hostname and port. The second
"""
if worker_path is None:
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py")
if num_objstores < 1:
raise Exception("`num_objstores` is {}, but should be at least 1.".format(num_objstores))
scheduler_address = address(node_ip_address, new_scheduler_port())
start_scheduler(scheduler_address, cleanup=True)
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default_worker.py")
# Start Redis.
redis_port = new_port()
redis_address = address(node_ip_address, redis_port)
start_redis(redis_port)
time.sleep(0.1)
# create objstores
for i in range(num_objstores):
start_objstore(scheduler_address, node_ip_address, cleanup=True)
time.sleep(0.2)
if i < num_objstores - 1:
num_workers_to_start = num_workers / num_objstores
else:
# In case num_workers is not divisible by num_objstores, start the correct
# remaining number of workers.
num_workers_to_start = num_workers - (num_objstores - 1) * (num_workers / num_objstores)
for _ in range(num_workers_to_start):
start_worker(node_ip_address, worker_path, scheduler_address, cleanup=True)
time.sleep(0.3)
return scheduler_address
# Start Plasma.
object_store_name, object_store_manager_name, object_store_manager_port = start_objstore(node_ip_address, redis_address, cleanup=True)
# Start the local scheduler.
time.sleep(0.1)
local_scheduler_name = start_local_scheduler(redis_address, object_store_name)
time.sleep(0.2)
# Aggregate the address information together.
address_info = {"node_ip_address": node_ip_address,
"redis_port": redis_port,
"object_store_name": object_store_name,
"object_store_manager_name": object_store_manager_name,
"local_scheduler_name": local_scheduler_name}
# Start the workers.
for _ in range(num_workers):
start_worker(address_info, worker_path, cleanup=True)
time.sleep(0.3)
# Return the addresses of the relevant processes.
return address_info
+432 -454
View File
File diff suppressed because it is too large Load Diff