From a7ddac6fb1b943463abc143ab58bcc99ab731558 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sat, 4 Mar 2017 23:02:56 -0800 Subject: [PATCH] Properly mock ray submodules when building documentation. (#337) --- .travis.yml | 1 - .travis/install-dependencies.sh | 2 -- doc/source/conf.py | 6 ++++ python/ray/__init__.py | 2 -- python/ray/actor.py | 8 ++--- python/ray/serialization.py | 12 +++++-- python/ray/services.py | 45 ++++++++++++++------------ python/ray/worker.py | 57 ++++++++++++++++++--------------- 8 files changed, 74 insertions(+), 59 deletions(-) diff --git a/.travis.yml b/.travis.yml index 591b8b9be..414221f5a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,7 +31,6 @@ matrix: # Try generating Sphinx documentation. To do this, we need to install # Ray first. - ./.travis/install-dependencies.sh - - ./.travis/install-ray.sh - export PATH="$HOME/miniconda/bin:$PATH" - cd doc - pip install -r requirements-doc.txt diff --git a/.travis/install-dependencies.sh b/.travis/install-dependencies.sh index 0507ebe66..c0de83c73 100755 --- a/.travis/install-dependencies.sh +++ b/.travis/install-dependencies.sh @@ -64,8 +64,6 @@ elif [[ "$LINT" == "1" ]]; then # Install miniconda. wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh bash miniconda.sh -b -p $HOME/miniconda - export PATH="$HOME/miniconda/bin:$PATH" - pip install numpy cloudpickle funcsigs colorama psutil redis else echo "Unrecognized environment." exit 1 diff --git a/doc/source/conf.py b/doc/source/conf.py index 2963d0249..3f162dc25 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -16,6 +16,12 @@ import sys import os import shlex +# These lines added to enable Sphinx to work without installing Ray. +import mock +MOCK_MODULES = ["ray.numbuf", "ray.local_scheduler", "ray.plasma"] +for mod_name in MOCK_MODULES: + sys.modules[mod_name] = mock.Mock() + # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 44f4af054..e25b8b4fa 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -13,8 +13,6 @@ if hasattr(ctypes, "windll"): # This is done by associating all child processes with a "job" object that imposes this behavior. (lambda kernel32: (lambda job: (lambda n: kernel32.SetInformationJobObject(job, 9, "\0" * 17 + chr(0x8 | 0x4 | 0x20) + "\0" * (n - 18), n))(0x90 if ctypes.sizeof(ctypes.c_void_p) > ctypes.sizeof(ctypes.c_int) else 0x70) and kernel32.AssignProcessToJobObject(job, ctypes.c_void_p(kernel32.GetCurrentProcess())))(ctypes.c_void_p(kernel32.CreateJobObjectW(None, None))) if kernel32 is not None else None)(ctypes.windll.kernel32) -import ray.experimental -import ray.serialization from ray.worker import register_class, error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log from ray.actor import actor from ray.actor import get_gpu_ids diff --git a/python/ray/actor.py b/python/ray/actor.py index 40dc95492..daae1fa0a 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -9,7 +9,7 @@ import numpy as np import random import traceback -import ray.local_scheduler as local_scheduler +import ray.local_scheduler import ray.pickling as pickling import ray.worker import ray.experimental.state as state @@ -30,7 +30,7 @@ def random_string(): return np.random.bytes(20) def random_actor_id(): - return local_scheduler.ObjectID(random_string()) + return ray.local_scheduler.ObjectID(random_string()) def get_actor_method_function_id(attr): """Get the function ID corresponding to an actor method. @@ -45,13 +45,13 @@ def get_actor_method_function_id(attr): function_id_hash.update(attr.encode("ascii")) function_id = function_id_hash.digest() assert len(function_id) == 20 - return local_scheduler.ObjectID(function_id) + return ray.local_scheduler.ObjectID(function_id) def fetch_and_register_actor(key, worker): """Import an actor.""" driver_id, actor_id_str, actor_name, module, pickled_class, assigned_gpu_ids, actor_method_names = \ worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class", "gpu_ids", "actor_method_names"]) - actor_id = local_scheduler.ObjectID(actor_id_str) + actor_id = ray.local_scheduler.ObjectID(actor_id_str) actor_name = actor_name.decode("ascii") module = module.decode("ascii") actor_method_names = json.loads(actor_method_names.decode("ascii")) diff --git a/python/ray/serialization.py b/python/ray/serialization.py index 9f5228824..1a707b269 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -4,7 +4,7 @@ from __future__ import print_function import numpy as np -import ray.numbuf as numbuf +import ray.numbuf import ray.pickling as pickling def check_serializable(cls): @@ -139,5 +139,11 @@ def deserialize(serialized_obj): obj.__dict__.update(serialized_obj) return obj -# Register the callbacks with numbuf. -numbuf.register_callbacks(serialize, deserialize) +def set_callbacks(): + """Register the custom callbacks with numbuf. + + The serialize callback is used to serialize objects that numbuf does not know + how to serialize (for example custom Python classes). The deserialize callback + is used to serialize objects that were serialized by the serialize callback. + """ + ray.numbuf.register_callbacks(serialize, deserialize) diff --git a/python/ray/services.py b/python/ray/services.py index b50049553..dbc1e8fe2 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -17,8 +17,8 @@ import time import threading # Ray modules -import ray.local_scheduler as local_scheduler -import ray.plasma as plasma +import ray.local_scheduler +import ray.plasma import ray.global_scheduler as global_scheduler PROCESS_TYPE_MONITOR = "monitor" @@ -417,7 +417,7 @@ def start_local_scheduler(redis_address, if num_gpus is None: # By default, assume this node has no GPUs. num_gpus = 0 - local_scheduler_name, p = local_scheduler.start_local_scheduler( + local_scheduler_name, p = ray.local_scheduler.start_local_scheduler( plasma_store_name, plasma_manager_name, worker_path=worker_path, @@ -489,28 +489,31 @@ def start_objstore(node_ip_address, redis_address, object_manager_port=None, else: objstore_memory = int(system_memory * 0.8) # Start the Plasma store. - plasma_store_name, p1 = plasma.start_plasma_store(plasma_store_memory=objstore_memory, - use_profiler=RUN_PLASMA_STORE_PROFILER, - stdout_file=store_stdout_file, - stderr_file=store_stderr_file) + plasma_store_name, p1 = ray.plasma.start_plasma_store( + plasma_store_memory=objstore_memory, + use_profiler=RUN_PLASMA_STORE_PROFILER, + stdout_file=store_stdout_file, + stderr_file=store_stderr_file) # Start the plasma manager. if object_manager_port is not None: - plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, - redis_address, - plasma_manager_port=object_manager_port, - node_ip_address=node_ip_address, - num_retries=1, - run_profiler=RUN_PLASMA_MANAGER_PROFILER, - stdout_file=manager_stdout_file, - stderr_file=manager_stderr_file) + plasma_manager_name, p2, plasma_manager_port = ray.plasma.start_plasma_manager( + plasma_store_name, + redis_address, + plasma_manager_port=object_manager_port, + node_ip_address=node_ip_address, + num_retries=1, + run_profiler=RUN_PLASMA_MANAGER_PROFILER, + stdout_file=manager_stdout_file, + stderr_file=manager_stderr_file) assert plasma_manager_port == object_manager_port else: - plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, - redis_address, - node_ip_address=node_ip_address, - run_profiler=RUN_PLASMA_MANAGER_PROFILER, - stdout_file=manager_stdout_file, - stderr_file=manager_stderr_file) + plasma_manager_name, p2, plasma_manager_port = ray.plasma.start_plasma_manager( + plasma_store_name, + redis_address, + node_ip_address=node_ip_address, + run_profiler=RUN_PLASMA_MANAGER_PROFILER, + stdout_file=manager_stdout_file, + stderr_file=manager_stderr_file) if cleanup: all_processes[PROCESS_TYPE_PLASMA_STORE].append(p1) all_processes[PROCESS_TYPE_PLASMA_MANAGER].append(p2) diff --git a/python/ray/worker.py b/python/ray/worker.py index 9ce571ec7..f17796c85 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -25,9 +25,9 @@ import traceback import ray.pickling as pickling import ray.serialization as serialization import ray.services as services -import ray.numbuf as numbuf -import ray.local_scheduler as local_scheduler -import ray.plasma as plasma +import ray.numbuf +import ray.local_scheduler +import ray.plasma SCRIPT_MODE = 0 WORKER_MODE = 1 @@ -53,7 +53,7 @@ def random_string(): return np.random.bytes(20) def random_object_id(): - return local_scheduler.ObjectID(random_string()) + return ray.local_scheduler.ObjectID(random_string()) class FunctionID(object): def __init__(self, function_id): @@ -78,7 +78,7 @@ def numbuf_serialize(value): The serialized object. """ assert len(contained_objectids) == 0, "This should be unreachable." - return numbuf.serialize_list([value]) + return ray.numbuf.serialize_list([value]) class RayTaskError(Exception): """An object used internally to represent a task that threw an exception. @@ -439,8 +439,8 @@ class Worker(object): """ # Serialize and put the object in the object store. try: - numbuf.store_list(objectid.id(), self.plasma_client.conn, [value]) - except numbuf.numbuf_plasma_object_exists_error as e: + ray.numbuf.store_list(objectid.id(), self.plasma_client.conn, [value]) + except ray.numbuf.numbuf_plasma_object_exists_error as e: # The object already exists in the object store, so there is no need to # add it again. TODO(rkn): We need to compare the hashes and make sure # that the objects are in fact the same. We also should return an error @@ -465,7 +465,7 @@ class Worker(object): self.plasma_client.fetch([object_id.id() for object_id in object_ids]) # Get the objects. We initially try to get the objects immediately. - final_results = numbuf.retrieve_list( + final_results = ray.numbuf.retrieve_list( [object_id.id() for object_id in object_ids], self.plasma_client.conn, 0) @@ -482,7 +482,7 @@ class Worker(object): # Do another fetch for objects that aren't available locally yet, in case # they were evicted since the last fetch. self.plasma_client.fetch(list(unready_ids.keys())) - results = numbuf.retrieve_list(list(unready_ids.keys()), + results = ray.numbuf.retrieve_list(list(unready_ids.keys()), self.plasma_client.conn, GET_TIMEOUT_MILLISECONDS) # Remove any entries for objects we received during this iteration so we @@ -504,7 +504,7 @@ class Worker(object): assert final_results[i][0] == object_ids[i].id() return [result[1][0] for result in final_results] - def submit_task(self, function_id, func_name, args, actor_id=local_scheduler.ObjectID(NIL_ACTOR_ID)): + def submit_task(self, function_id, func_name, args, actor_id=None): """Submit a remote task to the scheduler. Tell the scheduler to schedule the execution of the function with name @@ -519,13 +519,14 @@ class Worker(object): """ with log_span("ray:submit_task", worker=self): check_main_thread() + actor_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID) if actor_id is None else actor_id # Put large or complex arguments that are passed by value in the object # store first. args_for_local_scheduler = [] for arg in args: - if isinstance(arg, local_scheduler.ObjectID): + if isinstance(arg, ray.local_scheduler.ObjectID): args_for_local_scheduler.append(arg) - elif local_scheduler.check_simple_value(arg): + elif ray.local_scheduler.check_simple_value(arg): args_for_local_scheduler.append(arg) else: args_for_local_scheduler.append(put(arg)) @@ -534,9 +535,9 @@ class Worker(object): num_return_vals, num_cpus, num_gpus = self.function_properties[self.task_driver_id.id()][function_id.id()] # Submit the task to local scheduler. - task = local_scheduler.Task( + task = ray.local_scheduler.Task( self.task_driver_id, - local_scheduler.ObjectID(function_id.id()), + ray.local_scheduler.ObjectID(function_id.id()), args_for_local_scheduler, num_return_vals, self.current_task_id, @@ -686,14 +687,18 @@ def initialize_numbuf(worker=global_worker): This defines a custom serializer for object IDs and also tells numbuf to serialize several exception classes that we define for error handling. """ + ray.serialization.set_callbacks() # Define a custom serializer and deserializer for handling Object IDs. def objectid_custom_serializer(obj): class_identifier = serialization.class_identifier(type(obj)) contained_objectids.append(obj) return obj.id() def objectid_custom_deserializer(serialized_obj): - return local_scheduler.ObjectID(serialized_obj) - serialization.add_class_to_whitelist(local_scheduler.ObjectID, pickle=False, custom_serializer=objectid_custom_serializer, custom_deserializer=objectid_custom_deserializer) + return ray.local_scheduler.ObjectID(serialized_obj) + serialization.add_class_to_whitelist(ray.local_scheduler.ObjectID, + pickle=False, + custom_serializer=objectid_custom_serializer, + custom_deserializer=objectid_custom_deserializer) if worker.mode in [SCRIPT_MODE, SILENT_MODE]: # These should only be called on the driver because register_class will @@ -1045,7 +1050,7 @@ def fetch_and_register_remote_function(key, worker=global_worker): "module", "num_cpus", "num_gpus"]) - function_id = local_scheduler.ObjectID(function_id_str) + function_id = ray.local_scheduler.ObjectID(function_id_str) function_name = function_name.decode("ascii") num_return_vals = int(num_return_vals) num_cpus = int(num_cpus) @@ -1211,9 +1216,9 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, a worker.redis_client = redis.StrictRedis(host=redis_ip_address, port=int(redis_port)) worker.lock = threading.Lock() # Create an object store client. - worker.plasma_client = plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"]) + worker.plasma_client = ray.plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"]) # Create the local scheduler client. - worker.local_scheduler_client = local_scheduler.LocalSchedulerClient(info["local_scheduler_socket_name"], worker.actor_id) + worker.local_scheduler_client = ray.local_scheduler.LocalSchedulerClient(info["local_scheduler_socket_name"], worker.actor_id) # Register the worker with Redis. if mode in [SCRIPT_MODE, SILENT_MODE]: # The concept of a driver is the same as the concept of a "job". Register @@ -1249,12 +1254,12 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, a else: # Try to use true randomness. np.random.seed(None) - worker.current_task_id = local_scheduler.ObjectID(np.random.bytes(20)) + worker.current_task_id = ray.local_scheduler.ObjectID(np.random.bytes(20)) # When tasks are executed on remote workers in the context of multiple # drivers, the task driver ID is used to keep track of which driver is # responsible for the task so that error messages will be propagated to the # correct driver. - worker.task_driver_id = local_scheduler.ObjectID(worker.worker_id) + worker.task_driver_id = ray.local_scheduler.ObjectID(worker.worker_id) # Reset the state of the numpy random number generator. np.random.set_state(numpy_state) # Set other fields needed for computing task IDs. @@ -1471,7 +1476,7 @@ def put(value, worker=global_worker): if worker.mode == PYTHON_MODE: # In PYTHON_MODE, ray.put is the identity operation return value - object_id = local_scheduler.compute_put_id(worker.current_task_id, worker.put_index) + object_id = ray.local_scheduler.compute_put_id(worker.current_task_id, worker.put_index) worker.put_object(object_id, value) worker.put_index += 1 return object_id @@ -1504,8 +1509,8 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): object_id_strs = [object_id.id() for object_id in object_ids] timeout = timeout if timeout is not None else 2 ** 30 ready_ids, remaining_ids = worker.plasma_client.wait(object_id_strs, timeout, num_returns) - ready_ids = [local_scheduler.ObjectID(object_id) for object_id in ready_ids] - remaining_ids = [local_scheduler.ObjectID(object_id) for object_id in remaining_ids] + ready_ids = [ray.local_scheduler.ObjectID(object_id) for object_id in ready_ids] + remaining_ids = [ray.local_scheduler.ObjectID(object_id) for object_id in remaining_ids] return ready_ids, remaining_ids def wait_for_function(function_id, driver_id, timeout=5, worker=global_worker): @@ -1932,7 +1937,7 @@ def get_arguments_for_execution(function_name, serialized_args, worker=global_wo """ arguments = [] for (i, arg) in enumerate(serialized_args): - if isinstance(arg, local_scheduler.ObjectID): + if isinstance(arg, ray.local_scheduler.ObjectID): # get the object from the local object store argument = worker.get_object([arg])[0] if isinstance(argument, RayTaskError): @@ -1966,7 +1971,7 @@ def store_outputs_in_objstore(objectids, outputs, worker=global_worker): function. """ for i in range(len(objectids)): - if isinstance(outputs[i], local_scheduler.ObjectID): + if isinstance(outputs[i], ray.local_scheduler.ObjectID): raise Exception("This remote function returned an ObjectID as its {}th return value. This is not allowed.".format(i)) for i in range(len(objectids)): worker.put_object(objectids[i], outputs[i])