allow driver to run in PYTHON_MODE, which is equivalent to serial Python

This commit is contained in:
Robert Nishihara
2016-06-26 13:43:54 -07:00
parent e8e378883a
commit d97bce0d64
5 changed files with 44 additions and 1 deletions
+1
View File
@@ -4,6 +4,7 @@
SCRIPT_MODE = 0
WORKER_MODE = 1
SHELL_MODE = 2
PYTHON_MODE = 2
import libraylib as lib
import serialization
+2 -1
View File
@@ -105,7 +105,8 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None
time.sleep(0.5)
# driver_mode should equal ray.SCRIPT_MODE if this is being run in a script and
# ray.SHELL_MODE if it is being used interactively in a shell.
# ray.SHELL_MODE if it is being used interactively in a shell. It can also equal
# ray.PYTHON_MODE to run things in a manner equivalent to serial Python code.
def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None, driver_mode=ray.SCRIPT_MODE):
global drivers
if num_workers_per_objstore > 0 and worker_path is None:
+10
View File
@@ -7,6 +7,7 @@ import typing
import funcsigs
import numpy as np
import colorama
import copy
import ray
from ray.config import LOG_DIRECTORY, LOG_TIMESTAMP
@@ -170,6 +171,8 @@ def disconnect(worker=global_worker):
ray.lib.disconnect(worker.handle)
def get(objref, worker=global_worker):
if worker.mode == ray.PYTHON_MODE:
return objref # In ray.PYTHON_MODE, ray.get is the identity operation (the input will actually be a value not an objref)
ray.lib.request_object(worker.handle, objref)
if worker.mode == ray.SHELL_MODE or worker.mode == ray.SCRIPT_MODE:
print_task_info(ray.lib.task_info(worker.handle), worker.mode)
@@ -179,6 +182,8 @@ def get(objref, worker=global_worker):
return value
def put(value, worker=global_worker):
if worker.mode == ray.PYTHON_MODE:
return value # In ray.PYTHON_MODE, ray.put is the identity operation
objref = ray.lib.get_objref(worker.handle)
worker.put_object(objref, value)
if worker.mode == ray.SHELL_MODE or worker.mode == ray.SCRIPT_MODE:
@@ -225,6 +230,11 @@ def remote(arg_types, return_types, worker=global_worker):
"""This is what gets run immediately when a worker calls a remote function."""
args = list(args)
args.extend([kwargs[keyword] if kwargs.has_key(keyword) else default for keyword, default in func_call.keyword_defaults[len(args):]]) # fill in the remaining arguments
if worker.mode == ray.PYTHON_MODE:
# In ray.PYTHON_MODE, remote calls simply execute the function. We copy
# the arguments to prevent the function call from mutating them and to
# match the usual behavior of immutable remote objects.
return func(*copy.deepcopy(args))
check_arguments(func_call, args) # throws an exception if args are invalid
objrefs = worker.submit_task(func_call.func_name, args)
if len(objrefs) == 1: