Rename photon -> local scheduler. (#322)

This commit is contained in:
Robert Nishihara
2017-02-27 12:24:07 -08:00
committed by Philipp Moritz
parent a30eed452e
commit 1ae7e7d29e
36 changed files with 758 additions and 688 deletions
+4 -4
View File
@@ -6,7 +6,7 @@ import hashlib
import inspect
import json
import numpy as np
import photon
import local_scheduler
import random
import traceback
@@ -30,7 +30,7 @@ def random_string():
return np.random.bytes(20)
def random_actor_id():
return photon.ObjectID(random_string())
return local_scheduler.ObjectID(random_string())
def get_actor_method_function_id(attr):
"""Get the function ID corresponding to an actor method.
@@ -45,13 +45,13 @@ def get_actor_method_function_id(attr):
function_id_hash.update(attr.encode("ascii"))
function_id = function_id_hash.digest()
assert len(function_id) == 20
return photon.ObjectID(function_id)
return local_scheduler.ObjectID(function_id)
def fetch_and_register_actor(key, worker):
"""Import an actor."""
driver_id, actor_id_str, actor_name, module, pickled_class, assigned_gpu_ids, actor_method_names = \
worker.redis_client.hmget(key, ["driver_id", "actor_id", "name", "module", "class", "gpu_ids", "actor_method_names"])
actor_id = photon.ObjectID(actor_id_str)
actor_id = local_scheduler.ObjectID(actor_id_str)
actor_name = actor_name.decode("ascii")
module = module.decode("ascii")
actor_method_names = json.loads(actor_method_names.decode("ascii"))
+1 -1
View File
@@ -6,6 +6,6 @@ def get_local_schedulers(worker):
local_schedulers = []
for client in worker.redis_client.keys("CL:*"):
client_info = worker.redis_client.hgetall(client)
if client_info[b"client_type"] == b"photon":
if client_info[b"client_type"] == b"local_scheduler":
local_schedulers.append(client_info)
return local_schedulers
+15 -14
View File
@@ -17,7 +17,7 @@ import time
import threading
# Ray modules
import photon
import local_scheduler
import plasma
import global_scheduler
@@ -43,7 +43,7 @@ all_processes = OrderedDict([(PROCESS_TYPE_WORKER, []),
(PROCESS_TYPE_WEB_UI, [])])
# True if processes are run in the valgrind profiler.
RUN_PHOTON_PROFILER = False
RUN_LOCAL_SCHEDULER_PROFILER = False
RUN_PLASMA_MANAGER_PROFILER = False
RUN_PLASMA_STORE_PROFILER = False
@@ -90,7 +90,7 @@ def kill_process(p):
"""
if p.poll() is not None: # process has already terminated
return True
if RUN_PHOTON_PROFILER or RUN_PLASMA_MANAGER_PROFILER or RUN_PLASMA_STORE_PROFILER:
if RUN_LOCAL_SCHEDULER_PROFILER or RUN_PLASMA_MANAGER_PROFILER or RUN_PLASMA_STORE_PROFILER:
os.kill(p.pid, signal.SIGINT) # Give process signal to write profiler data.
time.sleep(0.1) # Wait for profiling data to be written.
@@ -415,17 +415,18 @@ def start_local_scheduler(redis_address,
if num_gpus is None:
# By default, assume this node has no GPUs.
num_gpus = 0
local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name,
plasma_manager_name,
worker_path=worker_path,
node_ip_address=node_ip_address,
redis_address=redis_address,
plasma_address=plasma_address,
use_profiler=RUN_PHOTON_PROFILER,
stdout_file=stdout_file,
stderr_file=stderr_file,
static_resource_list=[num_cpus, num_gpus],
num_workers=num_workers)
local_scheduler_name, p = local_scheduler.start_local_scheduler(
plasma_store_name,
plasma_manager_name,
worker_path=worker_path,
node_ip_address=node_ip_address,
redis_address=redis_address,
plasma_address=plasma_address,
use_profiler=RUN_LOCAL_SCHEDULER_PROFILER,
stdout_file=stdout_file,
stderr_file=stderr_file,
static_resource_list=[num_cpus, num_gpus],
num_workers=num_workers)
if cleanup:
all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p)
return local_scheduler_name
+38 -37
View File
@@ -26,7 +26,7 @@ import ray.pickling as pickling
import ray.serialization as serialization
import ray.services as services
import numbuf
import photon
import local_scheduler
import plasma
SCRIPT_MODE = 0
@@ -53,7 +53,7 @@ def random_string():
return np.random.bytes(20)
def random_object_id():
return photon.ObjectID(random_string())
return local_scheduler.ObjectID(random_string())
class FunctionID(object):
def __init__(self, function_id):
@@ -478,7 +478,7 @@ class Worker(object):
# until GET_TIMEOUT_MILLISECONDS milliseconds passes, then repeat.
while len(unready_ids) > 0:
for unready_id in unready_ids:
self.photon_client.reconstruct_object(unready_id)
self.local_scheduler_client.reconstruct_object(unready_id)
# Do another fetch for objects that aren't available locally yet, in case
# they were evicted since the last fetch.
self.plasma_client.fetch(list(unready_ids.keys()))
@@ -496,7 +496,7 @@ class Worker(object):
# If there were objects that we weren't able to get locally, let the local
# scheduler know that we're now unblocked.
if was_blocked:
self.photon_client.notify_unblocked()
self.local_scheduler_client.notify_unblocked()
# Unwrap the object from the list (it was wrapped put_object).
assert len(final_results) == len(object_ids)
@@ -504,7 +504,7 @@ class Worker(object):
assert final_results[i][0] == object_ids[i].id()
return [result[1][0] for result in final_results]
def submit_task(self, function_id, func_name, args, actor_id=photon.ObjectID(NIL_ACTOR_ID)):
def submit_task(self, function_id, func_name, args, actor_id=local_scheduler.ObjectID(NIL_ACTOR_ID)):
"""Submit a remote task to the scheduler.
Tell the scheduler to schedule the execution of the function with name
@@ -521,32 +521,33 @@ class Worker(object):
check_main_thread()
# Put large or complex arguments that are passed by value in the object
# store first.
args_for_photon = []
args_for_local_scheduler = []
for arg in args:
if isinstance(arg, photon.ObjectID):
args_for_photon.append(arg)
elif photon.check_simple_value(arg):
args_for_photon.append(arg)
if isinstance(arg, local_scheduler.ObjectID):
args_for_local_scheduler.append(arg)
elif local_scheduler.check_simple_value(arg):
args_for_local_scheduler.append(arg)
else:
args_for_photon.append(put(arg))
args_for_local_scheduler.append(put(arg))
# Look up the various function properties.
num_return_vals, num_cpus, num_gpus = self.function_properties[self.task_driver_id.id()][function_id.id()]
# Submit the task to Photon.
task = photon.Task(self.task_driver_id,
photon.ObjectID(function_id.id()),
args_for_photon,
num_return_vals,
self.current_task_id,
self.task_index,
actor_id, self.actor_counters[actor_id],
[num_cpus, num_gpus])
# Submit the task to local scheduler.
task = local_scheduler.Task(
self.task_driver_id,
local_scheduler.ObjectID(function_id.id()),
args_for_local_scheduler,
num_return_vals,
self.current_task_id,
self.task_index,
actor_id, self.actor_counters[actor_id],
[num_cpus, num_gpus])
# Increment the worker's task index to track how many tasks have been
# submitted by the current task so far.
self.task_index += 1
self.actor_counters[actor_id] += 1
self.photon_client.submit(task)
self.local_scheduler_client.submit(task)
return task.returns()
@@ -691,8 +692,8 @@ def initialize_numbuf(worker=global_worker):
contained_objectids.append(obj)
return obj.id()
def objectid_custom_deserializer(serialized_obj):
return photon.ObjectID(serialized_obj)
serialization.add_class_to_whitelist(photon.ObjectID, pickle=False, custom_serializer=objectid_custom_serializer, custom_deserializer=objectid_custom_deserializer)
return local_scheduler.ObjectID(serialized_obj)
serialization.add_class_to_whitelist(local_scheduler.ObjectID, pickle=False, custom_serializer=objectid_custom_serializer, custom_deserializer=objectid_custom_deserializer)
if worker.mode in [SCRIPT_MODE, SILENT_MODE]:
# These should only be called on the driver because register_class will
@@ -721,7 +722,7 @@ def get_address_info_from_redis_helper(redis_address, node_ip_address):
if info[b"node_ip_address"].decode("ascii") == node_ip_address:
if info[b"client_type"].decode("ascii") == "plasma_manager":
plasma_managers.append(info)
elif info[b"client_type"].decode("ascii") == "photon":
elif info[b"client_type"].decode("ascii") == "local_scheduler":
local_schedulers.append(info)
# Make sure that we got at one plasma manager and local scheduler.
assert len(plasma_managers) >= 1
@@ -945,8 +946,8 @@ def cleanup(worker=global_worker):
clusters in the tests, but the import and exit only happen once.
"""
disconnect(worker)
if hasattr(worker, "photon_client"):
del worker.photon_client
if hasattr(worker, "local_scheduler_client"):
del worker.local_scheduler_client
if hasattr(worker, "plasma_client"):
worker.plasma_client.shutdown()
@@ -1040,7 +1041,7 @@ def fetch_and_register_remote_function(key, worker=global_worker):
"module",
"num_cpus",
"num_gpus"])
function_id = photon.ObjectID(function_id_str)
function_id = local_scheduler.ObjectID(function_id_str)
function_name = function_name.decode("ascii")
num_return_vals = int(num_return_vals)
num_cpus = int(num_cpus)
@@ -1208,7 +1209,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, a
# Create an object store client.
worker.plasma_client = plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"])
# Create the local scheduler client.
worker.photon_client = photon.PhotonClient(info["local_scheduler_socket_name"], worker.actor_id)
worker.local_scheduler_client = local_scheduler.LocalSchedulerClient(info["local_scheduler_socket_name"], worker.actor_id)
# Register the worker with Redis.
if mode in [SCRIPT_MODE, SILENT_MODE]:
# The concept of a driver is the same as the concept of a "job". Register
@@ -1244,12 +1245,12 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, a
else:
# Try to use true randomness.
np.random.seed(None)
worker.current_task_id = photon.ObjectID(np.random.bytes(20))
worker.current_task_id = local_scheduler.ObjectID(np.random.bytes(20))
# When tasks are executed on remote workers in the context of multiple
# drivers, the task driver ID is used to keep track of which driver is
# responsible for the task so that error messages will be propagated to the
# correct driver.
worker.task_driver_id = photon.ObjectID(worker.worker_id)
worker.task_driver_id = local_scheduler.ObjectID(worker.worker_id)
# Reset the state of the numpy random number generator.
np.random.set_state(numpy_state)
# Set other fields needed for computing task IDs.
@@ -1411,7 +1412,7 @@ def flush_log(worker=global_worker):
"""Send the logged worker events to the global state store."""
event_log_key = b"event_log:" + worker.worker_id + b":" + worker.current_task_id.id()
event_log_value = json.dumps(worker.events)
worker.photon_client.log_event(event_log_key, event_log_value)
worker.local_scheduler_client.log_event(event_log_key, event_log_value)
worker.events = []
def get(object_ids, worker=global_worker):
@@ -1466,7 +1467,7 @@ def put(value, worker=global_worker):
if worker.mode == PYTHON_MODE:
# In PYTHON_MODE, ray.put is the identity operation
return value
object_id = photon.compute_put_id(worker.current_task_id, worker.put_index)
object_id = local_scheduler.compute_put_id(worker.current_task_id, worker.put_index)
worker.put_object(object_id, value)
worker.put_index += 1
return object_id
@@ -1499,8 +1500,8 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
object_id_strs = [object_id.id() for object_id in object_ids]
timeout = timeout if timeout is not None else 2 ** 30
ready_ids, remaining_ids = worker.plasma_client.wait(object_id_strs, timeout, num_returns)
ready_ids = [photon.ObjectID(object_id) for object_id in ready_ids]
remaining_ids = [photon.ObjectID(object_id) for object_id in remaining_ids]
ready_ids = [local_scheduler.ObjectID(object_id) for object_id in ready_ids]
remaining_ids = [local_scheduler.ObjectID(object_id) for object_id in remaining_ids]
return ready_ids, remaining_ids
def wait_for_function(function_id, driver_id, timeout=5, worker=global_worker):
@@ -1660,7 +1661,7 @@ def main_loop(worker=global_worker):
check_main_thread()
while True:
with log_span("ray:get_task", worker=worker):
task = worker.photon_client.get_task()
task = worker.local_scheduler_client.get_task()
function_id = task.function_id()
# Wait until the function to be executed has actually been registered on
@@ -1927,7 +1928,7 @@ def get_arguments_for_execution(function_name, serialized_args, worker=global_wo
"""
arguments = []
for (i, arg) in enumerate(serialized_args):
if isinstance(arg, photon.ObjectID):
if isinstance(arg, local_scheduler.ObjectID):
# get the object from the local object store
argument = worker.get_object([arg])[0]
if isinstance(argument, RayTaskError):
@@ -1961,7 +1962,7 @@ def store_outputs_in_objstore(objectids, outputs, worker=global_worker):
function.
"""
for i in range(len(objectids)):
if isinstance(outputs[i], photon.ObjectID):
if isinstance(outputs[i], local_scheduler.ObjectID):
raise Exception("This remote function returned an ObjectID as its {}th return value. This is not allowed.".format(i))
for i in range(len(objectids)):
worker.put_object(objectids[i], outputs[i])