mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 02:01:24 +08:00
Fixing Redis Key Consistencies for Actor, FunctionTable, FunctionsToRun, and RemoteFunction (#659)
* consistencies for Actor, FunctionTable, and FunctionsToRun * NOT WORKING: changing remote fn keys
This commit is contained in:
committed by
Philipp Moritz
parent
d4d2c03ac5
commit
8d350f628a
+1
-1
@@ -228,7 +228,7 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus,
|
||||
if worker.mode is None:
|
||||
raise Exception("Actors cannot be created before Ray has been started. "
|
||||
"You can start Ray with 'ray.init()'.")
|
||||
key = "Actor:{}".format(actor_id.id())
|
||||
key = b"Actor:" + actor_id.id()
|
||||
|
||||
# For now, all actor methods have 1 return value.
|
||||
driver_id = worker.task_driver_id.id()
|
||||
|
||||
@@ -500,7 +500,7 @@ class Worker(object):
|
||||
pickled_function = pickle.dumps(function)
|
||||
|
||||
function_to_run_id = random_string()
|
||||
key = "FunctionsToRun:{}".format(function_to_run_id)
|
||||
key = b"FunctionsToRun:" + function_to_run_id
|
||||
# First run the function on the driver. Pass in the number of workers on
|
||||
# this node that have already started executing this remote function,
|
||||
# and increment that value. Subtract 1 so that the counter starts at 0.
|
||||
@@ -630,9 +630,9 @@ def error_info(worker=global_worker):
|
||||
if function_id == NIL_FUNCTION_ID:
|
||||
function_name = b"Driver"
|
||||
else:
|
||||
task_driver_id = worker.task_driver_id
|
||||
function_name = worker.redis_client.hget(
|
||||
"RemoteFunction:{}:{}".format(worker.task_driver_id,
|
||||
function_id),
|
||||
b"RemoteFunction:" + task_driver_id.id() + b":" + function_id,
|
||||
"name")
|
||||
error_contents[b"data"] = function_name
|
||||
errors.append(error_contents)
|
||||
@@ -1103,7 +1103,7 @@ def fetch_and_register_remote_function(key, worker=global_worker):
|
||||
worker.functions[driver_id][function_id.id()] = (
|
||||
function_name, remote(function_id=function_id)(function))
|
||||
# Add the function to the function table.
|
||||
worker.redis_client.rpush("FunctionTable:{}".format(function_id.id()),
|
||||
worker.redis_client.rpush(b"FunctionTable:" + function_id.id(),
|
||||
worker.worker_id)
|
||||
|
||||
|
||||
@@ -1297,7 +1297,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
|
||||
info["manager_socket_name"])
|
||||
# Create the local scheduler client.
|
||||
if worker.actor_id != NIL_ACTOR_ID:
|
||||
num_gpus = int(worker.redis_client.hget("Actor:{}".format(actor_id),
|
||||
num_gpus = int(worker.redis_client.hget(b"Actor:" + actor_id,
|
||||
"num_gpus"))
|
||||
else:
|
||||
num_gpus = 0
|
||||
@@ -1358,7 +1358,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
|
||||
|
||||
# If this is an actor, get the ID of the corresponding class for the actor.
|
||||
if worker.actor_id != NIL_ACTOR_ID:
|
||||
actor_key = "Actor:{}".format(worker.actor_id)
|
||||
actor_key = b"Actor:" + worker.actor_id
|
||||
class_id = worker.redis_client.hget(actor_key, "class_id")
|
||||
worker.class_id = class_id
|
||||
|
||||
@@ -1845,7 +1845,8 @@ def export_remote_function(function_id, func_name, func, func_invoker,
|
||||
|
||||
worker.function_properties[worker.task_driver_id.id()][function_id.id()] = (
|
||||
num_return_vals, num_cpus, num_gpus)
|
||||
key = "RemoteFunction:{}:{}".format(worker.task_driver_id, function_id.id())
|
||||
task_driver_id = worker.task_driver_id
|
||||
key = b"RemoteFunction:" + task_driver_id.id() + b":" + function_id.id()
|
||||
|
||||
# Work around limitations of Python pickling.
|
||||
func_name_global_valid = func.__name__ in func.__globals__
|
||||
|
||||
Reference in New Issue
Block a user