Migrate Python C extension to Cython (#3541)

This commit is contained in:
Si-Yuan
2019-01-25 01:17:14 +08:00
committed by Philipp Moritz
parent c1a52b1c86
commit 48139cf861
49 changed files with 1909 additions and 1827 deletions
+18 -17
View File
@@ -223,7 +223,7 @@ class FunctionDescriptor(object):
ray.ObjectID to represent the function descriptor.
"""
if self.is_for_driver_task:
return ray.ObjectID.nil_id()
return ray.FunctionID.nil()
function_id_hash = hashlib.sha1()
# Include the function module and name in the hash.
function_id_hash.update(self.module_name.encode("ascii"))
@@ -232,7 +232,7 @@ class FunctionDescriptor(object):
function_id_hash.update(self._function_source_hash)
# Compute the function ID.
function_id = function_id_hash.digest()
return ray.ObjectID(function_id)
return ray.FunctionID(function_id)
def get_function_descriptor_list(self):
"""Return a list of bytes representing the function descriptor.
@@ -355,13 +355,13 @@ class FunctionActorManager(object):
check_oversized_pickle(pickled_function,
remote_function._function_name,
"remote function", self._worker)
key = (b"RemoteFunction:" + self._worker.task_driver_id.id() + b":" +
remote_function._function_descriptor.function_id.id())
key = (b"RemoteFunction:" + self._worker.task_driver_id.binary() + b":"
+ remote_function._function_descriptor.function_id.binary())
self._worker.redis_client.hmset(
key, {
"driver_id": self._worker.task_driver_id.id(),
"driver_id": self._worker.task_driver_id.binary(),
"function_id": remote_function._function_descriptor.
function_id.id(),
function_id.binary(),
"name": remote_function._function_name,
"module": function.__module__,
"function": pickled_function,
@@ -377,8 +377,8 @@ class FunctionActorManager(object):
"driver_id", "function_id", "name", "function", "num_return_vals",
"module", "resources", "max_calls"
])
function_id = ray.ObjectID(function_id_str)
driver_id = ray.ObjectID(driver_id_str)
function_id = ray.FunctionID(function_id_str)
driver_id = ray.DriverID(driver_id_str)
function_name = decode(function_name)
max_calls = int(max_calls)
module = decode(module)
@@ -406,7 +406,7 @@ class FunctionActorManager(object):
traceback_str,
driver_id=driver_id,
data={
"function_id": function_id.id(),
"function_id": function_id.binary(),
"function_name": function_name
})
else:
@@ -423,7 +423,8 @@ class FunctionActorManager(object):
max_calls=max_calls))
# Add the function to the function table.
self._worker.redis_client.rpush(
b"FunctionTable:" + function_id.id(), self._worker.worker_id)
b"FunctionTable:" + function_id.binary(),
self._worker.worker_id)
def get_execution_info(self, driver_id, function_descriptor):
"""Get the FunctionExecutionInfo of a remote function.
@@ -524,14 +525,14 @@ class FunctionActorManager(object):
"You might have started a background thread in a non-actor task, "
"please make sure the thread finishes before the task finishes.")
driver_id = self._worker.task_driver_id
key = (b"ActorClass:" + driver_id.id() + b":" +
function_descriptor.function_id.id())
key = (b"ActorClass:" + driver_id.binary() + b":" +
function_descriptor.function_id.binary())
actor_class_info = {
"class_name": Class.__name__,
"module": Class.__module__,
"class": pickle.dumps(Class),
"checkpoint_interval": checkpoint_interval,
"driver_id": driver_id.id(),
"driver_id": driver_id.binary(),
"actor_method_names": json.dumps(list(actor_method_names))
}
@@ -556,8 +557,8 @@ class FunctionActorManager(object):
# because of https://github.com/ray-project/ray/issues/1146.
def load_actor(self, driver_id, function_descriptor):
key = (b"ActorClass:" + driver_id.id() + b":" +
function_descriptor.function_id.id())
key = (b"ActorClass:" + driver_id.binary() + b":" +
function_descriptor.function_id.binary())
# Wait for the actor class key to have been imported by the
# import thread. TODO(rkn): It shouldn't be possible to end
# up in an infinite loop here, but we should push an error to
@@ -588,7 +589,7 @@ class FunctionActorManager(object):
class_name = decode(class_name)
module = decode(module)
driver_id = ray.ObjectID(driver_id_str)
driver_id = ray.DriverID(driver_id_str)
checkpoint_interval = int(checkpoint_interval)
actor_method_names = json.loads(decode(actor_method_names))
@@ -645,7 +646,7 @@ class FunctionActorManager(object):
ray_constants.REGISTER_ACTOR_PUSH_ERROR,
traceback_str,
driver_id,
data={"actor_id": actor_id.id()})
data={"actor_id": actor_id.binary()})
# TODO(rkn): In the future, it might make sense to have the worker
# exit here. However, currently that would lead to hanging if
# someone calls ray.get on a method invoked on the actor.