[WIP] Large changes to make the tests pass. (#376)

* Revert "Make tests more informative (#372)"

This reverts commit fd353250c8.

* fix bugs, in particular deactivate worker service on driver and remove condition variables

* changes to minimize the changes in this PR

* switch from faulty mutex synchronization to using atomics

* Increase the default size of the message queues, to accommodate exporting large numbers of remote functions. This is a temporary fix, but not a long term solution.

* Reorganize the scheduler export code to queue up exports. This does not solve the underlying problem yet, but sets up a solution.

* Start a separate thread on driver to print error messages by constantly querying the scheduler. This is a temporary solution because the solution based on starting a worker service for the driver which the scheduler can push error messages to is buggy.

* Fix segfault in taskcapsule destructor.

* Move tests for catching errors into a separate test file.

* Revert "roll back grpc (#368)"

This reverts commit c01ef95d04.
This commit is contained in:
Robert Nishihara
2016-08-15 11:02:54 -07:00
committed by Philipp Moritz
parent fd353250c8
commit 87bb7a8f67
16 changed files with 602 additions and 440 deletions
+11 -6
View File
@@ -2,7 +2,8 @@ import os
import sys
import time
import subprocess32 as subprocess
import numpy as np
import string
import random
# Ray modules
import config
@@ -21,7 +22,7 @@ def address(host, port):
return host + ":" + str(port)
def new_scheduler_port():
return np.random.randint(10000, 65536)
return random.randint(10000, 65535)
def cleanup():
"""When running in local mode, shutdown the Ray processes.
@@ -72,16 +73,16 @@ def start_objstore(scheduler_address, node_ip_address, cleanup):
scheduler_address (str): The ip address and port of the scheduler to connect
to.
node_ip_address (str): The ip address of the node running the object store.
The object store's port number will be chosen by the object store process.
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
this process will be killed by serices.cleanup() when the Python process
that imported services exits.
"""
p = subprocess.Popen(["objstore", scheduler_address, node_ip_address, "--log-file-prefix", config.get_log_file_path("")], env=_services_env)
random_string = "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))
p = subprocess.Popen(["objstore", scheduler_address, node_ip_address, "--log-file-name", config.get_log_file_path("-".join(["objstore", random_string]) + ".log")], env=_services_env)
if cleanup:
all_processes.append(p)
def start_worker(node_ip_address, worker_path, scheduler_address, cleanup=True, user_source_directory=None):
def start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=None, cleanup=True, user_source_directory=None):
"""This method starts a worker process.
Args:
@@ -90,6 +91,8 @@ def start_worker(node_ip_address, worker_path, scheduler_address, cleanup=True,
run.
scheduler_address (str): The ip address and port of the scheduler to connect
to.
objstore_address (Optional[str]): The ip address and port of the object
store to connect to.
cleanup (Optional[bool]): True if using Ray in local mode. If cleanup is
true, then this process will be killed by serices.cleanup() when the
Python process that imported services exits. This is True by default.
@@ -106,6 +109,8 @@ def start_worker(node_ip_address, worker_path, scheduler_address, cleanup=True,
"--node-ip-address=" + node_ip_address,
"--user-source-directory=" + user_source_directory,
"--scheduler-address=" + scheduler_address]
if objstore_address is not None:
command.append("--objstore-address=" + objstore_address)
p = subprocess.Popen(command)
if cleanup:
all_processes.append(p)
@@ -155,7 +160,7 @@ def start_workers(scheduler_address, objstore_address, num_workers, worker_path)
"""
node_ip_address = objstore_address.split(":")[0]
for _ in range(num_workers):
start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, cleanup=False)
start_worker(node_ip_address, worker_path, scheduler_address, cleanup=False)
def start_ray_local(node_ip_address="127.0.0.1", num_objstores=1, num_workers=0, worker_path=None):
"""Start Ray in local mode.
+47 -5
View File
@@ -9,6 +9,7 @@ import funcsigs
import numpy as np
import colorama
import atexit
import threading
# Ray modules
import config
@@ -368,9 +369,6 @@ class Worker(object):
eventually does call connect, if it is a driver, it will export these
functions to the scheduler. If cached_remote_functions is None, that means
that connect has been called already.
num_failed_tasks (int): The number of tasks that have failed and whose error
messages have been displayed to the user. We use this value to know when
a failed task hasn't been seen by the user and should be displayed.
"""
def __init__(self):
@@ -379,7 +377,6 @@ class Worker(object):
self.handle = None
self.mode = None
self.cached_remote_functions = []
self.num_failed_tasks = 0
def set_mode(self, mode):
"""Set the mode of the worker.
@@ -538,6 +535,9 @@ made by one task do not affect other tasks.
logger = logging.getLogger("ray")
"""Logger: The logging object for the Python worker code."""
class RayConnectionError(Exception):
pass
def check_connected(worker=global_worker):
"""Check if the worker is connected.
@@ -545,7 +545,7 @@ def check_connected(worker=global_worker):
Exception: An exception is raised if the worker is not connected.
"""
if worker.handle is None and worker.mode != raylib.PYTHON_MODE:
raise Exception("This command cannot be called before a Ray cluster has been started. You can start one with 'ray.init(start_ray_local=True, num_workers=1)'.")
raise RayConnectionError("This command cannot be called before a Ray cluster has been started. You can start one with 'ray.init(start_ray_local=True, num_workers=1)'.")
def print_failed_task(task_status):
"""Print information about failed tasks.
@@ -678,6 +678,37 @@ def cleanup(worker=global_worker):
atexit.register(cleanup)
def print_error_messages(worker=global_worker):
num_failed_tasks = 0
num_failed_remote_function_imports = 0
num_failed_reusable_variable_imports = 0
num_failed_reusable_variable_reinitializations = 0
while True:
try:
info = task_info(worker=worker)
# Print failed task errors.
for error in info["failed_tasks"][num_failed_tasks:]:
print error["error_message"]
num_failed_tasks = len(info["failed_tasks"])
# Print remote function import errors.
for error in info["failed_remote_function_imports"][num_failed_remote_function_imports:]:
print error["error_message"]
num_failed_remote_function_imports = len(info["failed_remote_function_imports"])
# Print reusable variable import errors.
for error in info["failed_reusable_variable_imports"][num_failed_reusable_variable_imports:]:
print error["error_message"]
num_failed_reusable_variable_imports = len(info["failed_reusable_variable_imports"])
# Print reusable variable reinitialization errors.
for error in info["failed_reinitialize_reusable_variables"][num_failed_reusable_variable_reinitializations:]:
print error["error_message"]
num_failed_reusable_variable_reinitializations = len(info["failed_reinitialize_reusable_variables"])
except RayConnectionError:
# When the driver is exiting, we set worker.handle to None, which will cause
# the check_connected call inside of task_info to raise an exception. We use
# the try block here to suppress that exception.
pass
time.sleep(0.2)
def connect(node_ip_address, scheduler_address, objstore_address=None, worker=global_worker, mode=raylib.WORKER_MODE):
"""Connect this worker to the scheduler and an object store.
@@ -702,6 +733,17 @@ def connect(node_ip_address, scheduler_address, objstore_address=None, worker=gl
# receive commands from the scheduler. This call also sets up a queue between
# the worker and the worker service.
worker.handle, worker.worker_address = raylib.create_worker(node_ip_address, scheduler_address, objstore_address if objstore_address is not None else "", mode)
# If this is a driver running in SCRIPT_MODE, start a thread to print error
# messages asynchronously in the background. Ideally the scheduler would push
# messages to the driver's worker service, but we ran into bugs when trying to
# properly shutdown the driver's worker service, so we are temporarily using
# this implementation which constantly queries the scheduler for new error
# messages.
if mode == raylib.SCRIPT_MODE:
t = threading.Thread(target=print_error_messages, args=(worker,))
# Making the thread a daemon causes it to exit when the main thread exits.
t.daemon = True
t.start()
worker.set_mode(mode)
FORMAT = "%(asctime)-15s %(message)s"
# Configure the Python logging module. Note that if we do not provide our own