Python 3 compatibility. (#121)

* Make common module Python 3 compatible.

* Make plasma module Python 3 compatible.

* Make photon module Python 3 compatible.

* Make numbuf module Python 3 compatible.

* Remaining changes for Python 3 compatibility.

* Test Python 3 in Travis.

* Fixes.
This commit is contained in:
Robert Nishihara
2016-12-13 17:37:22 -08:00
parent 946242929f
commit 79dd1815a2
42 changed files with 652 additions and 302 deletions
+5 -1
View File
@@ -1 +1,5 @@
from lib.python.global_scheduler_services import *
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from .lib.python.global_scheduler_services import *
+1 -1
View File
@@ -1 +1 @@
from photon import *
from .photon import *
+6 -2
View File
@@ -1,2 +1,6 @@
from libphoton import *
from photon_services import *
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from .libphoton import *
from .photon_services import *
+5 -1
View File
@@ -1 +1,5 @@
from lib.python.plasma import *
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from plasma.plasma import *
View File
+7 -2
View File
@@ -1,2 +1,7 @@
import random, linalg
from core import *
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from . import random
from . import linalg
from .core import *
+1 -1
View File
@@ -154,7 +154,7 @@ def qr(a):
Ts = []
for i in range(min(a.num_blocks[0], a.num_blocks[1])): # this differs from the paper, which says "for i in range(a.num_blocks[1])", but that doesn't seem to make any sense when a.num_blocks[1] > a.num_blocks[0]
sub_dist_array = subblocks.remote(a_work, range(i, a_work.num_blocks[0]), [i])
sub_dist_array = subblocks.remote(a_work, list(range(i, a_work.num_blocks[0])), [i])
y, t, _, R = tsqr_hr.remote(sub_dist_array)
y_val = ray.get(y)
+7 -2
View File
@@ -1,2 +1,7 @@
import random, linalg
from core import *
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from . import random
from . import linalg
from .core import *
+8 -7
View File
@@ -100,12 +100,14 @@ def serialize(obj):
elif class_id in custom_serializers.keys():
serialized_obj = {"data": custom_serializers[class_id](obj)}
else:
if not hasattr(obj, "__dict__"):
raise Exception("We do not know how to serialize the object '{}'".format(obj))
serialized_obj = obj.__dict__
# Handle the namedtuple case.
if is_named_tuple(type(obj)):
# Handle the namedtuple case.
serialized_obj = {}
serialized_obj["_ray_getnewargs_"] = obj.__getnewargs__()
elif hasattr(obj, "__dict__"):
serialized_obj = obj.__dict__
else:
raise Exception("We do not know how to serialize the object '{}'".format(obj))
result = dict(serialized_obj, **{"_pytype_": class_id})
return result
@@ -131,11 +133,10 @@ def deserialize(serialized_obj):
# In this case, serialized_obj should just be the __dict__ field.
if "_ray_getnewargs_" in serialized_obj:
obj = cls.__new__(cls, *serialized_obj["_ray_getnewargs_"])
serialized_obj.pop("_ray_getnewargs_")
else:
obj = cls.__new__(cls)
serialized_obj.pop("_pytype_")
obj.__dict__.update(serialized_obj)
serialized_obj.pop("_pytype_")
obj.__dict__.update(serialized_obj)
return obj
# Register the callbacks with numbuf.
+33 -25
View File
@@ -2,6 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import cloudpickle
import hashlib
import os
import sys
@@ -440,11 +441,12 @@ class Worker(object):
"""
self.plasma_client.fetch([objectid.id()])
buff = self.plasma_client.get(objectid.id())
metadata = self.plasma_client.get_metadata(objectid.id())
metadata_size = len(metadata)
metadata_buff = self.plasma_client.get_metadata(objectid.id())
metadata_size = len(metadata_buff)
data = np.frombuffer(buff.buffer, dtype="byte")[8:]
metadata = np.frombuffer(metadata_buff.buffer, dtype="byte")
metadata_offset = int(np.frombuffer(buff.buffer, dtype="int64", count=1)[0])
serialized = numbuf.read_from_buffer(memoryview(data), bytearray(metadata), metadata_offset)
serialized = numbuf.read_from_buffer(memoryview(data), memoryview(metadata), metadata_offset)
# Create an ObjectFixture. If the object we are getting is backed by the
# PlasmaBuffer, this ObjectFixture will keep the PlasmaBuffer in scope as
# long as the object is in scope.
@@ -519,7 +521,7 @@ class Worker(object):
function_to_run_id = random_string()
key = "FunctionsToRun:{}".format(function_to_run_id)
self.redis_client.hmset(key, {"function_id": function_to_run_id,
"function": pickling.dumps(function)})
"function": cloudpickle.dumps(function)})
self.redis_client.rpush("Exports", key)
self.driver_export_counter += 1
@@ -580,15 +582,15 @@ def error_info(worker=global_worker):
"""Return information about failed tasks."""
check_main_thread()
check_connected(worker)
result = {"TaskError": [],
"RemoteFunctionImportError": [],
"ReusableVariableImportError": [],
"ReusableVariableReinitializeError": [],
"FunctionToRunError": []
result = {b"TaskError": [],
b"RemoteFunctionImportError": [],
b"ReusableVariableImportError": [],
b"ReusableVariableReinitializeError": [],
b"FunctionToRunError": []
}
error_keys = worker.redis_client.lrange("ErrorKeys", 0, -1)
for error_key in error_keys:
error_type = error_key.split(":", 1)[0]
error_type = error_key.split(b":", 1)[0]
error_contents = worker.redis_client.hgetall(error_key)
result[error_type].append(error_contents)
@@ -718,7 +720,9 @@ def fetch_and_register_remote_function(key, worker=global_worker):
"""Import a remote function."""
function_id_str, function_name, serialized_function, num_return_vals, module, function_export_counter = worker.redis_client.hmget(key, ["function_id", "name", "function", "num_return_vals", "module", "function_export_counter"])
function_id = photon.ObjectID(function_id_str)
function_name = function_name.decode("ascii")
num_return_vals = int(num_return_vals)
module = module.decode("ascii")
function_export_counter = int(function_export_counter)
try:
function = pickling.loads(serialized_function)
@@ -746,6 +750,7 @@ def fetch_and_register_remote_function(key, worker=global_worker):
def fetch_and_register_reusable_variable(key, worker=global_worker):
"""Import a reusable variable."""
reusable_variable_name, serialized_initializer, serialized_reinitializer = worker.redis_client.hmget(key, ["name", "initializer", "reinitializer"])
reusable_variable_name = reusable_variable_name.decode("ascii")
try:
initializer = pickling.loads(serialized_initializer)
reinitializer = pickling.loads(serialized_reinitializer)
@@ -793,11 +798,11 @@ def import_thread(worker):
with worker.lock:
export_keys = worker.redis_client.lrange("Exports", 0, -1)
for key in export_keys:
if key.startswith("RemoteFunction"):
if key.startswith(b"RemoteFunction"):
fetch_and_register_remote_function(key, worker=worker)
elif key.startswith("ReusableVariables"):
elif key.startswith(b"ReusableVariables"):
fetch_and_register_reusable_variable(key, worker=worker)
elif key.startswith("FunctionsToRun"):
elif key.startswith(b"FunctionsToRun"):
fetch_and_execute_function_to_run(key, worker=worker)
else:
raise Exception("This code should be unreachable.")
@@ -808,16 +813,16 @@ def import_thread(worker):
with worker.lock:
if msg["type"] == "psubscribe":
continue
assert msg["data"] == "rpush"
assert msg["data"] == b"rpush"
num_imports = worker.redis_client.llen("Exports")
assert num_imports >= worker.worker_import_counter
for i in range(worker.worker_import_counter, num_imports):
key = worker.redis_client.lindex("Exports", i)
if key.startswith("RemoteFunction"):
if key.startswith(b"RemoteFunction"):
fetch_and_register_remote_function(key, worker=worker)
elif key.startswith("ReusableVariables"):
elif key.startswith(b"ReusableVariables"):
fetch_and_register_reusable_variable(key, worker=worker)
elif key.startswith("FunctionsToRun"):
elif key.startswith(b"FunctionsToRun"):
fetch_and_execute_function_to_run(key, worker=worker)
else:
raise Exception("This code should be unreachable.")
@@ -1125,7 +1130,7 @@ def main_loop(worker=global_worker):
# export counter for the task. If not, wait until we have imported enough.
while True:
with worker.lock:
if worker.functions.has_key(function_id.id()) and (worker.function_export_counters[function_id.id()] <= worker.worker_import_counter):
if function_id.id() in worker.functions and (worker.function_export_counters[function_id.id()] <= worker.worker_import_counter):
break
time.sleep(0.001)
# Execute the task.
@@ -1207,7 +1212,7 @@ def remote(*args, **kwargs):
def remote_decorator(func):
func_name = "{}.{}".format(func.__module__, func.__name__)
if func_id is None:
function_id = FunctionID((hashlib.sha256(func_name).digest())[:20])
function_id = FunctionID((hashlib.sha256(func_name.encode("ascii")).digest())[:20])
else:
function_id = func_id
@@ -1216,7 +1221,7 @@ def remote(*args, **kwargs):
check_main_thread()
check_connected()
args = list(args)
args.extend([kwargs[keyword] if kwargs.has_key(keyword) else default for keyword, default in keyword_defaults[len(args):]]) # fill in the remaining arguments
args.extend([kwargs[keyword] if keyword in kwargs else default for keyword, default in keyword_defaults[len(args):]]) # fill in the remaining arguments
if any([arg is funcsigs._empty for arg in args]):
raise Exception("Not enough arguments were provided to {}.".format(func_name))
if _mode() == PYTHON_MODE:
@@ -1249,9 +1254,12 @@ def remote(*args, **kwargs):
func_invoker.is_remote = True
func_name = "{}.{}".format(func.__module__, func.__name__)
func_invoker.func_name = func_name
func_invoker.func_doc = func.func_doc
if sys.version_info >= (3, 0):
func_invoker.__doc__ = func.__doc__
else:
func_invoker.func_doc = func.func_doc
sig_params = [(k, v) for k, v in funcsigs.signature(func).parameters.iteritems()]
sig_params = [(k, v) for k, v in funcsigs.signature(func).parameters.items()]
keyword_defaults = [(k, v.default) for k, v in sig_params]
has_vararg_param = any([v.kind == v.VAR_POSITIONAL for k, v in sig_params])
func_invoker.has_vararg_param = has_vararg_param
@@ -1279,7 +1287,7 @@ def remote(*args, **kwargs):
return remote_decorator
if _mode() == WORKER_MODE:
if kwargs.has_key("function_id"):
if "function_id" in kwargs:
num_return_vals = kwargs["num_return_vals"]
function_id = kwargs["function_id"]
return make_remote_decorator(num_return_vals, function_id)
@@ -1292,9 +1300,9 @@ def remote(*args, **kwargs):
else:
# This is the case where the decorator is something like
# @ray.remote(num_return_vals=2).
assert len(args) == 0 and kwargs.has_key("num_return_vals"), "The @ray.remote decorator must be applied either with no arguments and no parentheses, for example '@ray.remote', or it must be applied with only the argument num_return_vals, like '@ray.remote(num_return_vals=2)'."
assert len(args) == 0 and "num_return_vals" in kwargs, "The @ray.remote decorator must be applied either with no arguments and no parentheses, for example '@ray.remote', or it must be applied with only the argument num_return_vals, like '@ray.remote(num_return_vals=2)'."
num_return_vals = kwargs["num_return_vals"]
assert not kwargs.has_key("function_id")
assert not "function_id" in kwargs
return make_remote_decorator(num_return_vals)
def check_signature_supported(has_kwargs_param, has_vararg_param, keyword_defaults, name):
+3 -3
View File
@@ -24,9 +24,9 @@ setup(name="ray",
version="0.0.1",
packages=find_packages(),
package_data={"common": ["thirdparty/redis/src/redis-server"],
"plasma": ["build/plasma_store",
"build/plasma_manager",
"lib/python/libplasma.so"],
"plasma": ["plasma_store",
"plasma_manager",
"libplasma.so"],
"photon": ["build/photon_scheduler",
"photon/libphoton.so"],
"global_scheduler": ["build/global_scheduler"]},