From 79dd1815a27c066906bc138ba48036ea472e104a Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 13 Dec 2016 17:37:22 -0800 Subject: [PATCH] Python 3 compatibility. (#121) * Make common module Python 3 compatible. * Make plasma module Python 3 compatible. * Make photon module Python 3 compatible. * Make numbuf module Python 3 compatible. * Remaining changes for Python 3 compatibility. * Test Python 3 in Travis. * Fixes. --- .travis.yml | 43 ++---- .travis/install-dependencies.sh | 68 +++++++++ .travis/install-ray.sh | 48 ++++++ .travis/run-c-tests.sh | 22 +++ build.sh | 8 +- install-dependencies.sh | 39 ----- lib/python/global_scheduler/__init__.py | 6 +- lib/python/photon/__init__.py | 2 +- lib/python/photon/photon/__init__.py | 8 +- lib/python/plasma/{build => }/.gitkeep | 0 lib/python/plasma/__init__.py | 6 +- lib/python/plasma/lib/__init__.py | 0 lib/python/plasma/lib/python/__init__.py | 0 lib/python/ray/array/distributed/__init__.py | 9 +- lib/python/ray/array/distributed/linalg.py | 2 +- lib/python/ray/array/remote/__init__.py | 9 +- lib/python/ray/serialization.py | 15 +- lib/python/ray/worker.py | 58 ++++---- lib/python/setup.py | 6 +- numbuf/CMakeLists.txt | 6 +- numbuf/python/src/pynumbuf/adapters/python.cc | 25 ++-- numbuf/python/src/pynumbuf/numbuf.cc | 67 +++++++-- numbuf/python/test/runtest.py | 10 +- src/common/lib/python/common_extension.c | 138 ++++++++++-------- src/common/lib/python/common_module.c | 46 +++++- src/common/redis_module/runtest.py | 10 +- src/common/test/test.py | 21 ++- src/global_scheduler/test/test.py | 4 +- src/photon/CMakeLists.txt | 6 +- src/photon/photon/__init__.py | 4 +- src/photon/photon_extension.c | 57 ++++++-- src/photon/test/test.py | 2 +- src/plasma/CMakeLists.txt | 8 +- src/plasma/plasma/__init__.py | 5 + src/plasma/{lib/python => plasma}/plasma.py | 14 +- src/plasma/plasma_extension.c | 86 +++++++++-- src/plasma/{lib/python => }/setup.py | 13 +- src/plasma/test/test.py | 12 +- test/array_test.py | 4 + test/failure_test.py | 40 ++--- test/microbenchmarks.py | 4 + test/runtest.py | 23 ++- 42 files changed, 652 insertions(+), 302 deletions(-) create mode 100755 .travis/install-dependencies.sh create mode 100755 .travis/install-ray.sh create mode 100755 .travis/run-c-tests.sh delete mode 100755 install-dependencies.sh rename lib/python/plasma/{build => }/.gitkeep (100%) delete mode 100644 lib/python/plasma/lib/__init__.py delete mode 100644 lib/python/plasma/lib/python/__init__.py create mode 100644 src/plasma/plasma/__init__.py rename src/plasma/{lib/python => plasma}/plasma.py (96%) rename src/plasma/{lib/python => }/setup.py (60%) diff --git a/.travis.yml b/.travis.yml index 66ac88b99..01382bbac 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,19 +6,18 @@ matrix: include: - os: linux dist: trusty - python: "2.7" + env: PYTHON=2.7 - os: linux dist: trusty - python: "3.5" + env: PYTHON=3.5 - os: osx osx_image: xcode7 - python: "2.7" + env: PYTHON=2.7 - os: osx osx_image: xcode7 - python: "3.5" + env: PYTHON=3.5 - os: linux dist: trusty - python: "2.7" env: LINT=1 before_install: # In case we ever want to use a different version of clang-format: @@ -31,8 +30,7 @@ matrix: - .travis/check-git-clang-format-output.sh - os: linux dist: trusty - python: "2.7" - env: VALGRIND=1 + env: VALGRIND=1 PYTHON=2.7 before_install: - sudo apt-get update -qq - sudo apt-get install -qq valgrind @@ -50,34 +48,13 @@ matrix: - python src/global_scheduler/test/test.py valgrind install: - - ./install-dependencies.sh - - ./build.sh - - - cd src/common - - make test - - cd ../.. - - - cd src/plasma - - make test - - cd ../.. - - - cd src/photon - - make test - - cd ../.. - - - cd numbuf - - sudo python setup.py install - - cd .. - - - cd src/common/lib/python - - sudo python setup.py install - - cd ../../../.. - - - cd lib/python - - sudo python setup.py install - - cd ../.. + - ./.travis/install-dependencies.sh + - ./.travis/run-c-tests.sh + - ./.travis/install-ray.sh script: + - if [[ "$PYTHON" == "3.5" ]]; then export PATH="$HOME/miniconda/bin:$PATH"; fi + - python numbuf/python/test/runtest.py - python src/common/test/test.py diff --git a/.travis/install-dependencies.sh b/.travis/install-dependencies.sh new file mode 100755 index 000000000..457851296 --- /dev/null +++ b/.travis/install-dependencies.sh @@ -0,0 +1,68 @@ +#!/usr/bin/env bash + +ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) + +echo "PYTHON is $PYTHON" + +platform="unknown" +unamestr="$(uname)" +if [[ "$unamestr" == "Linux" ]]; then + echo "Platform is linux." + platform="linux" +elif [[ "$unamestr" == "Darwin" ]]; then + echo "Platform is macosx." + platform="macosx" +else + echo "Unrecognized platform." + exit 1 +fi + +if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then + sudo apt-get update + sudo apt-get install -y cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip nodejs npm + sudo pip install funcsigs colorama psutil redis + sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 +elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then + sudo apt-get update + sudo apt-get install -y cmake python-dev python-numpy build-essential autoconf curl libtool libboost-all-dev unzip nodejs npm + # Install miniconda. + wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh + bash miniconda.sh -b -p $HOME/miniconda + export PATH="$HOME/miniconda/bin:$PATH" + pip install numpy funcsigs colorama psutil redis + pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 +elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then + # check that brew is installed + which -s brew + if [[ $? != 0 ]]; then + echo "Could not find brew, please install brew (see http://brew.sh/)." + exit 1 + else + echo "Updating brew." + brew update + fi + brew install cmake automake autoconf libtool boost node + sudo easy_install pip + sudo pip install numpy funcsigs colorama psutil redis --ignore-installed six + sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 +elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then + # check that brew is installed + which -s brew + if [[ $? != 0 ]]; then + echo "Could not find brew, please install brew (see http://brew.sh/)." + exit 1 + else + echo "Updating brew." + brew update + fi + brew install cmake automake autoconf libtool boost node + # Install miniconda. + wget https://repo.continuum.io/miniconda/Miniconda3-latest-MacOSX-x86_64.sh -O miniconda.sh + bash miniconda.sh -b -p $HOME/miniconda + export PATH="$HOME/miniconda/bin:$PATH" + pip install numpy funcsigs colorama psutil redis + pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 +else + echo "Unrecognized environment." + exit 1 +fi diff --git a/.travis/install-ray.sh b/.travis/install-ray.sh new file mode 100755 index 000000000..5ce8e52e4 --- /dev/null +++ b/.travis/install-ray.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash + +# Cause the script to exit if a single command fails. +set -e + +ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) + +echo "PYTHON is $PYTHON" + +if [[ "$PYTHON" == "2.7" ]]; then + + pushd "$ROOT_DIR/../numbuf" + sudo python setup.py install + popd + + pushd "$ROOT_DIR/../src/common/lib/python" + pushd "$ROOT_DIR/../src/common" + make + popd + sudo python setup.py install + popd + + pushd "$ROOT_DIR/../lib/python" + sudo python setup.py install + popd + +elif [[ "$PYTHON" == "3.5" ]]; then + export PATH="$HOME/miniconda/bin:$PATH" + + pushd "$ROOT_DIR/../numbuf" + python setup.py install --user + popd + + pushd "$ROOT_DIR/../src/common/lib/python" + pushd "$ROOT_DIR/../src/common" + make + popd + python setup.py install --user + popd + + pushd "$ROOT_DIR/../lib/python" + python setup.py install --user + popd + +else + echo "Unrecognized Python version." + exit 1 +fi diff --git a/.travis/run-c-tests.sh b/.travis/run-c-tests.sh new file mode 100755 index 000000000..c698919ef --- /dev/null +++ b/.travis/run-c-tests.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +# Cause the script to exit if a single command fails. +set -e + +ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) + +echo "PYTHON is $PYTHON" + +if [[ "$PYTHON" == "3.5" ]]; then export PATH="$HOME/miniconda/bin:$PATH"; fi + +pushd "$ROOT_DIR/../src/common" + make test +popd + +pushd "$ROOT_DIR/../src/plasma" + make test +popd + +pushd "$ROOT_DIR/../src/photon" + make test +popd diff --git a/build.sh b/build.sh index ffe36d977..1c4d8155f 100755 --- a/build.sh +++ b/build.sh @@ -41,10 +41,10 @@ pushd "$PLASMA_DIR" make install popd popd -cp "$PLASMA_DIR/build/plasma_store" "$PYTHON_PLASMA_DIR/build/" -cp "$PLASMA_DIR/build/plasma_manager" "$PYTHON_PLASMA_DIR/build/" -cp "$PLASMA_DIR/lib/python/plasma.py" "$PYTHON_PLASMA_DIR/lib/python/" -cp "$PLASMA_DIR/lib/python/libplasma.so" "$PYTHON_PLASMA_DIR/lib/python/" +cp "$PLASMA_DIR/build/plasma_store" "$PYTHON_PLASMA_DIR/" +cp "$PLASMA_DIR/build/plasma_manager" "$PYTHON_PLASMA_DIR/" +cp "$PLASMA_DIR/plasma/plasma.py" "$PYTHON_PLASMA_DIR/" +cp "$PLASMA_DIR/plasma/libplasma.so" "$PYTHON_PLASMA_DIR/" pushd "$PHOTON_DIR" make clean diff --git a/install-dependencies.sh b/install-dependencies.sh deleted file mode 100755 index 546989d15..000000000 --- a/install-dependencies.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env bash - -ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) - -platform="unknown" -unamestr="$(uname)" -if [[ "$unamestr" == "Linux" ]]; then - echo "Platform is linux." - platform="linux" -elif [[ "$unamestr" == "Darwin" ]]; then - echo "Platform is macosx." - platform="macosx" -else - echo "Unrecognized platform." - exit 1 -fi - -if [[ $platform == "linux" ]]; then - # These commands must be kept in sync with the installation instructions. - sudo apt-get update - sudo apt-get install -y cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip nodejs npm - sudo pip install funcsigs colorama psutil redis -elif [[ $platform == "macosx" ]]; then - # check that brew is installed - which -s brew - if [[ $? != 0 ]]; then - echo "Could not find brew, please install brew (see http://brew.sh/)." - exit 1 - else - echo "Updating brew." - brew update - fi - # These commands must be kept in sync with the installation instructions. - brew install cmake automake autoconf libtool boost node - sudo easy_install pip - sudo pip install numpy funcsigs colorama psutil redis --ignore-installed six -fi - -sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples. diff --git a/lib/python/global_scheduler/__init__.py b/lib/python/global_scheduler/__init__.py index c92c0e1b6..36b583f9c 100644 --- a/lib/python/global_scheduler/__init__.py +++ b/lib/python/global_scheduler/__init__.py @@ -1 +1,5 @@ -from lib.python.global_scheduler_services import * +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from .lib.python.global_scheduler_services import * diff --git a/lib/python/photon/__init__.py b/lib/python/photon/__init__.py index 73b874cd4..08512ce4d 100644 --- a/lib/python/photon/__init__.py +++ b/lib/python/photon/__init__.py @@ -1 +1 @@ -from photon import * +from .photon import * diff --git a/lib/python/photon/photon/__init__.py b/lib/python/photon/photon/__init__.py index d34d8d3e1..c81d5021f 100644 --- a/lib/python/photon/photon/__init__.py +++ b/lib/python/photon/photon/__init__.py @@ -1,2 +1,6 @@ -from libphoton import * -from photon_services import * +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from .libphoton import * +from .photon_services import * diff --git a/lib/python/plasma/build/.gitkeep b/lib/python/plasma/.gitkeep similarity index 100% rename from lib/python/plasma/build/.gitkeep rename to lib/python/plasma/.gitkeep diff --git a/lib/python/plasma/__init__.py b/lib/python/plasma/__init__.py index af5061d17..6fe09d30f 100644 --- a/lib/python/plasma/__init__.py +++ b/lib/python/plasma/__init__.py @@ -1 +1,5 @@ -from lib.python.plasma import * +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from plasma.plasma import * diff --git a/lib/python/plasma/lib/__init__.py b/lib/python/plasma/lib/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/lib/python/plasma/lib/python/__init__.py b/lib/python/plasma/lib/python/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/lib/python/ray/array/distributed/__init__.py b/lib/python/ray/array/distributed/__init__.py index d61616df6..2c0ddfffc 100644 --- a/lib/python/ray/array/distributed/__init__.py +++ b/lib/python/ray/array/distributed/__init__.py @@ -1,2 +1,7 @@ -import random, linalg -from core import * +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from . import random +from . import linalg +from .core import * diff --git a/lib/python/ray/array/distributed/linalg.py b/lib/python/ray/array/distributed/linalg.py index 11b9c3454..87ab4c6a1 100644 --- a/lib/python/ray/array/distributed/linalg.py +++ b/lib/python/ray/array/distributed/linalg.py @@ -154,7 +154,7 @@ def qr(a): Ts = [] for i in range(min(a.num_blocks[0], a.num_blocks[1])): # this differs from the paper, which says "for i in range(a.num_blocks[1])", but that doesn't seem to make any sense when a.num_blocks[1] > a.num_blocks[0] - sub_dist_array = subblocks.remote(a_work, range(i, a_work.num_blocks[0]), [i]) + sub_dist_array = subblocks.remote(a_work, list(range(i, a_work.num_blocks[0])), [i]) y, t, _, R = tsqr_hr.remote(sub_dist_array) y_val = ray.get(y) diff --git a/lib/python/ray/array/remote/__init__.py b/lib/python/ray/array/remote/__init__.py index d61616df6..2c0ddfffc 100644 --- a/lib/python/ray/array/remote/__init__.py +++ b/lib/python/ray/array/remote/__init__.py @@ -1,2 +1,7 @@ -import random, linalg -from core import * +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from . import random +from . import linalg +from .core import * diff --git a/lib/python/ray/serialization.py b/lib/python/ray/serialization.py index b91ccad6b..c650bbc85 100644 --- a/lib/python/ray/serialization.py +++ b/lib/python/ray/serialization.py @@ -100,12 +100,14 @@ def serialize(obj): elif class_id in custom_serializers.keys(): serialized_obj = {"data": custom_serializers[class_id](obj)} else: - if not hasattr(obj, "__dict__"): - raise Exception("We do not know how to serialize the object '{}'".format(obj)) - serialized_obj = obj.__dict__ + # Handle the namedtuple case. if is_named_tuple(type(obj)): - # Handle the namedtuple case. + serialized_obj = {} serialized_obj["_ray_getnewargs_"] = obj.__getnewargs__() + elif hasattr(obj, "__dict__"): + serialized_obj = obj.__dict__ + else: + raise Exception("We do not know how to serialize the object '{}'".format(obj)) result = dict(serialized_obj, **{"_pytype_": class_id}) return result @@ -131,11 +133,10 @@ def deserialize(serialized_obj): # In this case, serialized_obj should just be the __dict__ field. if "_ray_getnewargs_" in serialized_obj: obj = cls.__new__(cls, *serialized_obj["_ray_getnewargs_"]) - serialized_obj.pop("_ray_getnewargs_") else: obj = cls.__new__(cls) - serialized_obj.pop("_pytype_") - obj.__dict__.update(serialized_obj) + serialized_obj.pop("_pytype_") + obj.__dict__.update(serialized_obj) return obj # Register the callbacks with numbuf. diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 4ccd84d81..8db23cc6a 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import cloudpickle import hashlib import os import sys @@ -440,11 +441,12 @@ class Worker(object): """ self.plasma_client.fetch([objectid.id()]) buff = self.plasma_client.get(objectid.id()) - metadata = self.plasma_client.get_metadata(objectid.id()) - metadata_size = len(metadata) + metadata_buff = self.plasma_client.get_metadata(objectid.id()) + metadata_size = len(metadata_buff) data = np.frombuffer(buff.buffer, dtype="byte")[8:] + metadata = np.frombuffer(metadata_buff.buffer, dtype="byte") metadata_offset = int(np.frombuffer(buff.buffer, dtype="int64", count=1)[0]) - serialized = numbuf.read_from_buffer(memoryview(data), bytearray(metadata), metadata_offset) + serialized = numbuf.read_from_buffer(memoryview(data), memoryview(metadata), metadata_offset) # Create an ObjectFixture. If the object we are getting is backed by the # PlasmaBuffer, this ObjectFixture will keep the PlasmaBuffer in scope as # long as the object is in scope. @@ -519,7 +521,7 @@ class Worker(object): function_to_run_id = random_string() key = "FunctionsToRun:{}".format(function_to_run_id) self.redis_client.hmset(key, {"function_id": function_to_run_id, - "function": pickling.dumps(function)}) + "function": cloudpickle.dumps(function)}) self.redis_client.rpush("Exports", key) self.driver_export_counter += 1 @@ -580,15 +582,15 @@ def error_info(worker=global_worker): """Return information about failed tasks.""" check_main_thread() check_connected(worker) - result = {"TaskError": [], - "RemoteFunctionImportError": [], - "ReusableVariableImportError": [], - "ReusableVariableReinitializeError": [], - "FunctionToRunError": [] + result = {b"TaskError": [], + b"RemoteFunctionImportError": [], + b"ReusableVariableImportError": [], + b"ReusableVariableReinitializeError": [], + b"FunctionToRunError": [] } error_keys = worker.redis_client.lrange("ErrorKeys", 0, -1) for error_key in error_keys: - error_type = error_key.split(":", 1)[0] + error_type = error_key.split(b":", 1)[0] error_contents = worker.redis_client.hgetall(error_key) result[error_type].append(error_contents) @@ -718,7 +720,9 @@ def fetch_and_register_remote_function(key, worker=global_worker): """Import a remote function.""" function_id_str, function_name, serialized_function, num_return_vals, module, function_export_counter = worker.redis_client.hmget(key, ["function_id", "name", "function", "num_return_vals", "module", "function_export_counter"]) function_id = photon.ObjectID(function_id_str) + function_name = function_name.decode("ascii") num_return_vals = int(num_return_vals) + module = module.decode("ascii") function_export_counter = int(function_export_counter) try: function = pickling.loads(serialized_function) @@ -746,6 +750,7 @@ def fetch_and_register_remote_function(key, worker=global_worker): def fetch_and_register_reusable_variable(key, worker=global_worker): """Import a reusable variable.""" reusable_variable_name, serialized_initializer, serialized_reinitializer = worker.redis_client.hmget(key, ["name", "initializer", "reinitializer"]) + reusable_variable_name = reusable_variable_name.decode("ascii") try: initializer = pickling.loads(serialized_initializer) reinitializer = pickling.loads(serialized_reinitializer) @@ -793,11 +798,11 @@ def import_thread(worker): with worker.lock: export_keys = worker.redis_client.lrange("Exports", 0, -1) for key in export_keys: - if key.startswith("RemoteFunction"): + if key.startswith(b"RemoteFunction"): fetch_and_register_remote_function(key, worker=worker) - elif key.startswith("ReusableVariables"): + elif key.startswith(b"ReusableVariables"): fetch_and_register_reusable_variable(key, worker=worker) - elif key.startswith("FunctionsToRun"): + elif key.startswith(b"FunctionsToRun"): fetch_and_execute_function_to_run(key, worker=worker) else: raise Exception("This code should be unreachable.") @@ -808,16 +813,16 @@ def import_thread(worker): with worker.lock: if msg["type"] == "psubscribe": continue - assert msg["data"] == "rpush" + assert msg["data"] == b"rpush" num_imports = worker.redis_client.llen("Exports") assert num_imports >= worker.worker_import_counter for i in range(worker.worker_import_counter, num_imports): key = worker.redis_client.lindex("Exports", i) - if key.startswith("RemoteFunction"): + if key.startswith(b"RemoteFunction"): fetch_and_register_remote_function(key, worker=worker) - elif key.startswith("ReusableVariables"): + elif key.startswith(b"ReusableVariables"): fetch_and_register_reusable_variable(key, worker=worker) - elif key.startswith("FunctionsToRun"): + elif key.startswith(b"FunctionsToRun"): fetch_and_execute_function_to_run(key, worker=worker) else: raise Exception("This code should be unreachable.") @@ -1125,7 +1130,7 @@ def main_loop(worker=global_worker): # export counter for the task. If not, wait until we have imported enough. while True: with worker.lock: - if worker.functions.has_key(function_id.id()) and (worker.function_export_counters[function_id.id()] <= worker.worker_import_counter): + if function_id.id() in worker.functions and (worker.function_export_counters[function_id.id()] <= worker.worker_import_counter): break time.sleep(0.001) # Execute the task. @@ -1207,7 +1212,7 @@ def remote(*args, **kwargs): def remote_decorator(func): func_name = "{}.{}".format(func.__module__, func.__name__) if func_id is None: - function_id = FunctionID((hashlib.sha256(func_name).digest())[:20]) + function_id = FunctionID((hashlib.sha256(func_name.encode("ascii")).digest())[:20]) else: function_id = func_id @@ -1216,7 +1221,7 @@ def remote(*args, **kwargs): check_main_thread() check_connected() args = list(args) - args.extend([kwargs[keyword] if kwargs.has_key(keyword) else default for keyword, default in keyword_defaults[len(args):]]) # fill in the remaining arguments + args.extend([kwargs[keyword] if keyword in kwargs else default for keyword, default in keyword_defaults[len(args):]]) # fill in the remaining arguments if any([arg is funcsigs._empty for arg in args]): raise Exception("Not enough arguments were provided to {}.".format(func_name)) if _mode() == PYTHON_MODE: @@ -1249,9 +1254,12 @@ def remote(*args, **kwargs): func_invoker.is_remote = True func_name = "{}.{}".format(func.__module__, func.__name__) func_invoker.func_name = func_name - func_invoker.func_doc = func.func_doc + if sys.version_info >= (3, 0): + func_invoker.__doc__ = func.__doc__ + else: + func_invoker.func_doc = func.func_doc - sig_params = [(k, v) for k, v in funcsigs.signature(func).parameters.iteritems()] + sig_params = [(k, v) for k, v in funcsigs.signature(func).parameters.items()] keyword_defaults = [(k, v.default) for k, v in sig_params] has_vararg_param = any([v.kind == v.VAR_POSITIONAL for k, v in sig_params]) func_invoker.has_vararg_param = has_vararg_param @@ -1279,7 +1287,7 @@ def remote(*args, **kwargs): return remote_decorator if _mode() == WORKER_MODE: - if kwargs.has_key("function_id"): + if "function_id" in kwargs: num_return_vals = kwargs["num_return_vals"] function_id = kwargs["function_id"] return make_remote_decorator(num_return_vals, function_id) @@ -1292,9 +1300,9 @@ def remote(*args, **kwargs): else: # This is the case where the decorator is something like # @ray.remote(num_return_vals=2). - assert len(args) == 0 and kwargs.has_key("num_return_vals"), "The @ray.remote decorator must be applied either with no arguments and no parentheses, for example '@ray.remote', or it must be applied with only the argument num_return_vals, like '@ray.remote(num_return_vals=2)'." + assert len(args) == 0 and "num_return_vals" in kwargs, "The @ray.remote decorator must be applied either with no arguments and no parentheses, for example '@ray.remote', or it must be applied with only the argument num_return_vals, like '@ray.remote(num_return_vals=2)'." num_return_vals = kwargs["num_return_vals"] - assert not kwargs.has_key("function_id") + assert not "function_id" in kwargs return make_remote_decorator(num_return_vals) def check_signature_supported(has_kwargs_param, has_vararg_param, keyword_defaults, name): diff --git a/lib/python/setup.py b/lib/python/setup.py index a79a97654..2f279a569 100644 --- a/lib/python/setup.py +++ b/lib/python/setup.py @@ -24,9 +24,9 @@ setup(name="ray", version="0.0.1", packages=find_packages(), package_data={"common": ["thirdparty/redis/src/redis-server"], - "plasma": ["build/plasma_store", - "build/plasma_manager", - "lib/python/libplasma.so"], + "plasma": ["plasma_store", + "plasma_manager", + "libplasma.so"], "photon": ["build/photon_scheduler", "photon/libphoton.so"], "global_scheduler": ["build/global_scheduler"]}, diff --git a/numbuf/CMakeLists.txt b/numbuf/CMakeLists.txt index 5c2e3e80e..0ee519c18 100644 --- a/numbuf/CMakeLists.txt +++ b/numbuf/CMakeLists.txt @@ -25,8 +25,9 @@ message(STATUS "PYTHON_INCLUDE_DIRS: " ${PYTHON_INCLUDE_DIRS}) execute_process(COMMAND ${CUSTOM_PYTHON_EXECUTABLE} -c "import sys; print(sys.exec_prefix)" OUTPUT_VARIABLE PYTHON_PREFIX OUTPUT_STRIP_TRAILING_WHITESPACE) message(STATUS "PYTHON_PREFIX: " ${PYTHON_PREFIX}) +# The name ending in "m" is for miniconda. FIND_LIBRARY(PYTHON_LIBRARIES - NAMES ${PYTHON_LIBRARY_NAME} + NAMES "${PYTHON_LIBRARY_NAME}" "${PYTHON_LIBRARY_NAME}m" HINTS "${PYTHON_PREFIX}" PATH_SUFFIXES "lib" "libs" NO_DEFAULT_PATH) @@ -35,8 +36,9 @@ message(STATUS "PYTHON_LIBRARIES: " ${PYTHON_LIBRARIES}) # the Python include directories. if(NOT PYTHON_LIBRARIES) message(STATUS "Failed to find PYTHON_LIBRARIES near the Python executable, so now looking near the Python include directories.") + # The name ending in "m" is for miniconda. FIND_LIBRARY(PYTHON_LIBRARIES - NAMES ${PYTHON_LIBRARY_NAME} + NAMES "${PYTHON_LIBRARY_NAME}" "${PYTHON_LIBRARY_NAME}m" HINTS "${PYTHON_INCLUDE_DIRS}/../.." PATH_SUFFIXES "lib" "libs" NO_DEFAULT_PATH) diff --git a/numbuf/python/src/pynumbuf/adapters/python.cc b/numbuf/python/src/pynumbuf/adapters/python.cc index 8bebe2810..0e98f07d8 100644 --- a/numbuf/python/src/pynumbuf/adapters/python.cc +++ b/numbuf/python/src/pynumbuf/adapters/python.cc @@ -16,6 +16,10 @@ extern PyObject* numbuf_deserialize_callback; namespace numbuf { +#if PY_MAJOR_VERSION >= 3 +#define PyInt_FromLong PyLong_FromLong +#endif + Status get_value( ArrayPtr arr, int32_t index, int32_t type, PyObject* base, PyObject** result) { switch (arr->type()->type) { @@ -30,7 +34,7 @@ Status get_value( int32_t nchars; const uint8_t* str = std::static_pointer_cast(arr)->GetValue(index, &nchars); - *result = PyString_FromStringAndSize(reinterpret_cast(str), nchars); + *result = PyBytes_FromStringAndSize(reinterpret_cast(str), nchars); return Status::OK(); } case Type::STRING: { @@ -82,24 +86,25 @@ Status append(PyObject* elem, SequenceBuilder& builder, std::vector& int64_t data = PyLong_AsLongLongAndOverflow(elem, &overflow); RETURN_NOT_OK(builder.AppendInt64(data)); if (overflow) { return Status::NotImplemented("long overflow"); } +#if PY_MAJOR_VERSION < 3 } else if (PyInt_Check(elem)) { RETURN_NOT_OK(builder.AppendInt64(static_cast(PyInt_AS_LONG(elem)))); - } else if (PyString_Check(elem)) { - auto data = reinterpret_cast(PyString_AS_STRING(elem)); - auto size = PyString_GET_SIZE(elem); +#endif + } else if (PyBytes_Check(elem)) { + auto data = reinterpret_cast(PyBytes_AS_STRING(elem)); + auto size = PyBytes_GET_SIZE(elem); RETURN_NOT_OK(builder.AppendBytes(data, size)); } else if (PyUnicode_Check(elem)) { Py_ssize_t size; #if PY_MAJOR_VERSION >= 3 - char* data = - PyUnicode_AsUTF8AndSize(elem, &size); // TODO(pcm): Check if this is correct + char* data = PyUnicode_AsUTF8AndSize(elem, &size); #else PyObject* str = PyUnicode_AsUTF8String(elem); char* data = PyString_AS_STRING(str); size = PyString_GET_SIZE(str); + Py_XDECREF(str); #endif Status s = builder.AppendString(data, size); - Py_XDECREF(str); RETURN_NOT_OK(s); } else if (PyList_Check(elem)) { builder.AppendList(PyList_Size(elem)); @@ -119,7 +124,7 @@ Status append(PyObject* elem, SequenceBuilder& builder, std::vector& } else { if (!numbuf_serialize_callback) { std::stringstream ss; - ss << "data type of " << PyString_AS_STRING(PyObject_Repr(elem)) + ss << "data type of " << PyBytes_AS_STRING(PyObject_Repr(elem)) << " not recognized and custom serialization handler not registered"; return Status::NotImplemented(ss.str()); } else { @@ -246,7 +251,7 @@ Status SerializeDict( // This block is used to decrement the reference counts of the results // returned by the serialization callback, which is called in SerializeArray // in numpy.cc as well as in DeserializeDict and in append in this file. - static PyObject* py_type = PyString_FromString("_pytype_"); + static PyObject* py_type = PyUnicode_FromString("_pytype_"); for (const auto& dict : dicts) { if (PyDict_Contains(dict, py_type)) { // If the dictionary contains the key "_pytype_", then the user has to @@ -273,7 +278,7 @@ Status DeserializeDict(std::shared_ptr array, int32_t start_idx, int32_t } Py_XDECREF(keys); // PyList_GetItem(keys, ...) incremented the reference count Py_XDECREF(vals); // PyList_GetItem(vals, ...) incremented the reference count - static PyObject* py_type = PyString_FromString("_pytype_"); + static PyObject* py_type = PyUnicode_FromString("_pytype_"); if (PyDict_Contains(result, py_type) && numbuf_deserialize_callback) { PyObject* arglist = Py_BuildValue("(O)", result); // The result of the call to PyObject_CallObject will be passed to Python diff --git a/numbuf/python/src/pynumbuf/numbuf.cc b/numbuf/python/src/pynumbuf/numbuf.cc index 70be81b26..c1d509ba3 100644 --- a/numbuf/python/src/pynumbuf/numbuf.cc +++ b/numbuf/python/src/pynumbuf/numbuf.cc @@ -5,6 +5,8 @@ #define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API #include +#include "bytesobject.h" + #include #include @@ -72,7 +74,7 @@ static PyObject* serialize_list(PyObject* self, PyObject* args) { PyObject* r = PyTuple_New(3); PyTuple_SetItem(r, 0, PyByteArray_FromStringAndSize(ptr, buffer->size())); - PyTuple_SetItem(r, 1, PyInt_FromLong(size)); + PyTuple_SetItem(r, 1, PyLong_FromLong(size)); PyTuple_SetItem(r, 2, PyCapsule_New(reinterpret_cast(batch), "arrow", &ArrowCapsule_Destructor)); return r; @@ -95,20 +97,22 @@ static PyObject* write_to_buffer(PyObject* self, PyObject* args) { int64_t header_end_offset; ARROW_CHECK_OK(ipc::WriteRecordBatch((*batch)->columns(), (*batch)->num_rows(), target.get(), &body_end_offset, &header_end_offset)); - return PyInt_FromLong(header_end_offset); + return PyLong_FromLong(header_end_offset); } /* Documented in doc/numbuf.rst in ray-core */ static PyObject* read_from_buffer(PyObject* self, PyObject* args) { - PyObject* memoryview; - PyObject* metadata; + PyObject* data_memoryview; + PyObject* metadata_memoryview; int64_t metadata_offset; - if (!PyArg_ParseTuple(args, "OOL", &memoryview, &metadata, &metadata_offset)) { + if (!PyArg_ParseTuple( + args, "OOL", &data_memoryview, &metadata_memoryview, &metadata_offset)) { return NULL; } - auto ptr = reinterpret_cast(PyByteArray_AsString(metadata)); - auto schema_buffer = std::make_shared(ptr, PyByteArray_Size(metadata)); + Py_buffer* metadata_buffer = PyMemoryView_GET_BUFFER(metadata_memoryview); + auto ptr = reinterpret_cast(metadata_buffer->buf); + auto schema_buffer = std::make_shared(ptr, metadata_buffer->len); std::shared_ptr message; ARROW_CHECK_OK(ipc::Message::Open(schema_buffer, &message)); DCHECK_EQ(ipc::Message::SCHEMA, message->type()); @@ -116,7 +120,7 @@ static PyObject* read_from_buffer(PyObject* self, PyObject* args) { std::shared_ptr schema; ARROW_CHECK_OK(schema_msg->GetSchema(&schema)); - Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview); + Py_buffer* buffer = PyMemoryView_GET_BUFFER(data_memoryview); auto source = std::make_shared( reinterpret_cast(buffer->buf), buffer->len); std::shared_ptr reader; @@ -180,13 +184,54 @@ static PyMethodDef NumbufMethods[] = { "set serialization and deserialization callbacks"}, {NULL, NULL, 0, NULL}}; -PyMODINIT_FUNC initlibnumbuf(void) { - PyObject* m; - m = Py_InitModule3("libnumbuf", NumbufMethods, "Python C Extension for Numbuf"); +// clang-format off +#if PY_MAJOR_VERSION >= 3 +static struct PyModuleDef moduledef = { + PyModuleDef_HEAD_INIT, + "libnumbuf", /* m_name */ + "Python C Extension for Numbuf", /* m_doc */ + 0, /* m_size */ + NumbufMethods, /* m_methods */ + NULL, /* m_reload */ + NULL, /* m_traverse */ + NULL, /* m_clear */ + NULL, /* m_free */ +}; +#endif +// clang-format on + +#if PY_MAJOR_VERSION >= 3 +#define INITERROR return NULL +#else +#define INITERROR return +#endif + +#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */ +#define PyMODINIT_FUNC void +#endif + +#if PY_MAJOR_VERSION >= 3 +#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void) +#else +#define MOD_INIT(name) PyMODINIT_FUNC init##name(void) +#endif + +MOD_INIT(libnumbuf) { +#if PY_MAJOR_VERSION >= 3 + PyObject* m = PyModule_Create(&moduledef); +#else + PyObject* m = + Py_InitModule3("libnumbuf", NumbufMethods, "Python C Extension for Numbuf"); +#endif + char numbuf_error[] = "numbuf.error"; NumbufError = PyErr_NewException(numbuf_error, NULL, NULL); Py_INCREF(NumbufError); PyModule_AddObject(m, "numbuf_error", NumbufError); import_array(); + +#if PY_MAJOR_VERSION >= 3 + return m; +#endif } } diff --git a/numbuf/python/test/runtest.py b/numbuf/python/test/runtest.py index 59afd9d7a..e90067967 100644 --- a/numbuf/python/test/runtest.py +++ b/numbuf/python/test/runtest.py @@ -6,10 +6,11 @@ import unittest import numbuf import numpy as np from numpy.testing import assert_equal +import sys -TEST_OBJECTS = [{(1,2) : 1}, {() : 2}, [1, "hello", 3.0], 42, 43L, "hello world", +TEST_OBJECTS = [{(1,2) : 1}, {() : 2}, [1, "hello", 3.0], 42, 43, "hello world", u"x", u"\u262F", 42.0, - 1L << 62, (1.0, "hi"), + 1 << 62, (1.0, "hi"), None, (None, None), ("hello", None), True, False, (True, False), "hello", {True: "hello", False: "world"}, @@ -18,6 +19,9 @@ TEST_OBJECTS = [{(1,2) : 1}, {() : 2}, [1, "hello", 3.0], 42, 43L, "hello world" np.uint8(3), np.uint32(4), np.uint64(5), np.float32(1.0), np.float64(1.0)] +if sys.version_info < (3, 0): + TEST_OBJECTS += [long(42), long(1 << 62)] + class SerializationTests(unittest.TestCase): def roundTripTest(self, data): @@ -110,7 +114,7 @@ class SerializationTests(unittest.TestCase): size = size + 4096 # INITIAL_METADATA_SIZE in arrow buff = np.zeros(size, dtype="uint8") metadata_offset = numbuf.write_to_buffer(batch, memoryview(buff)) - array = numbuf.read_from_buffer(memoryview(buff), schema, metadata_offset) + array = numbuf.read_from_buffer(memoryview(buff), memoryview(schema), metadata_offset) result = numbuf.deserialize_list(array) assert_equal(result[0], obj) diff --git a/src/common/lib/python/common_extension.c b/src/common/lib/python/common_extension.c index 1dd8726f3..591526923 100644 --- a/src/common/lib/python/common_extension.c +++ b/src/common/lib/python/common_extension.c @@ -1,4 +1,5 @@ #include +#include "bytesobject.h" #include "node.h" #include "common.h" @@ -17,12 +18,19 @@ PyObject *pickle_dumps = NULL; PyObject *pickle_protocol = NULL; void init_pickle_module(void) { - /* For Python 3 this needs to be "_pickle" instead of "cPickle". */ +#if PY_MAJOR_VERSION >= 3 + pickle_module = PyImport_ImportModule("pickle"); +#else pickle_module = PyImport_ImportModuleNoBlock("cPickle"); - pickle_loads = PyString_FromString("loads"); - pickle_dumps = PyString_FromString("dumps"); - pickle_protocol = PyObject_GetAttrString(pickle_module, "HIGHEST_PROTOCOL"); +#endif CHECK(pickle_module != NULL); + CHECK(PyObject_HasAttrString(pickle_module, "loads")); + CHECK(PyObject_HasAttrString(pickle_module, "dumps")); + CHECK(PyObject_HasAttrString(pickle_module, "HIGHEST_PROTOCOL")); + pickle_loads = PyUnicode_FromString("loads"); + pickle_dumps = PyUnicode_FromString("dumps"); + pickle_protocol = PyObject_GetAttrString(pickle_module, "HIGHEST_PROTOCOL"); + CHECK(pickle_protocol != NULL); } /* Define the PyObjectID class. */ @@ -62,8 +70,8 @@ PyObject *PyObjectID_make(object_id object_id) { static PyObject *PyObjectID_id(PyObject *self) { PyObjectID *s = (PyObjectID *) self; - return PyString_FromStringAndSize((char *) &s->object_id.id[0], - sizeof(s->object_id.id)); + return PyBytes_FromStringAndSize((char *) &s->object_id.id[0], + sizeof(s->object_id.id)); } static PyObject *PyObjectID_richcompare(PyObjectID *self, @@ -106,7 +114,7 @@ static PyObject *PyObjectID_richcompare(PyObjectID *self, static long PyObjectID_hash(PyObjectID *self) { PyObject *tuple = PyTuple_New(UNIQUE_ID_SIZE); for (int i = 0; i < UNIQUE_ID_SIZE; ++i) { - PyTuple_SetItem(tuple, i, PyInt_FromLong(self->object_id.id[i])); + PyTuple_SetItem(tuple, i, PyLong_FromLong(self->object_id.id[i])); } long hash = PyObject_Hash(tuple); Py_XDECREF(tuple); @@ -120,7 +128,7 @@ static PyObject *PyObjectID_repr(PyObjectID *self) { UT_string *repr; utstring_new(repr); utstring_printf(repr, "ObjectID(%s)", hex_id); - PyObject *result = PyString_FromString(utstring_body(repr)); + PyObject *result = PyUnicode_FromString(utstring_body(repr)); utstring_free(repr); return result; } @@ -144,7 +152,7 @@ static PyMemberDef PyObjectID_members[] = { }; PyTypeObject PyObjectIDType = { - PyObject_HEAD_INIT(NULL) 0, /* ob_size */ + PyVarObject_HEAD_INIT(NULL, 0) /* ob_size */ "common.ObjectID", /* tp_name */ sizeof(PyObjectID), /* tp_basicsize */ 0, /* tp_itemsize */ @@ -203,17 +211,17 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { &parent_task_id, &parent_counter)) { return -1; } - size_t size = PyList_Size(arguments); + Py_ssize_t size = PyList_Size(arguments); /* Determine the size of pass by value data in bytes. */ - size_t value_data_bytes = 0; - for (size_t i = 0; i < size; ++i) { + Py_ssize_t value_data_bytes = 0; + for (Py_ssize_t i = 0; i < size; ++i) { PyObject *arg = PyList_GetItem(arguments, i); if (!PyObject_IsInstance(arg, (PyObject *) &PyObjectIDType)) { CHECK(pickle_module != NULL); CHECK(pickle_dumps != NULL); PyObject *data = PyObject_CallMethodObjArgs(pickle_module, pickle_dumps, arg, pickle_protocol, NULL); - value_data_bytes += PyString_Size(data); + value_data_bytes += PyBytes_Size(data); utarray_push_back(val_repr_ptrs, &data); } } @@ -223,15 +231,17 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { start_construct_task_spec(parent_task_id, parent_counter, function_id, size, num_returns, value_data_bytes); /* Add the task arguments. */ - for (size_t i = 0; i < size; ++i) { + for (Py_ssize_t i = 0; i < size; ++i) { PyObject *arg = PyList_GetItem(arguments, i); if (PyObject_IsInstance(arg, (PyObject *) &PyObjectIDType)) { task_args_add_ref(self->spec, ((PyObjectID *) arg)->object_id); } else { - PyObject *data = - *((PyObject **) utarray_eltptr(val_repr_ptrs, val_repr_index)); - task_args_add_val(self->spec, (uint8_t *) PyString_AS_STRING(data), - PyString_GET_SIZE(data)); + /* We do this check because we cast a signed int to an unsigned int. */ + CHECK(val_repr_index >= 0); + PyObject *data = *((PyObject **) utarray_eltptr( + val_repr_ptrs, (uint64_t) val_repr_index)); + task_args_add_val(self->spec, (uint8_t *) PyBytes_AS_STRING(data), + PyBytes_GET_SIZE(data)); Py_DECREF(data); val_repr_index += 1; } @@ -269,8 +279,8 @@ static PyObject *PyTask_arguments(PyObject *self) { CHECK(pickle_module != NULL); CHECK(pickle_loads != NULL); PyObject *str = - PyString_FromStringAndSize((char *) task_arg_val(task, i), - (Py_ssize_t) task_arg_length(task, i)); + PyBytes_FromStringAndSize((char *) task_arg_val(task, i), + (Py_ssize_t) task_arg_length(task, i)); PyObject *val = PyObject_CallMethodObjArgs(pickle_module, pickle_loads, str, NULL); Py_XDECREF(str); @@ -304,44 +314,44 @@ static PyMethodDef PyTask_methods[] = { }; PyTypeObject PyTaskType = { - PyObject_HEAD_INIT(NULL) 0, /* ob_size */ - "task.Task", /* tp_name */ - sizeof(PyTask), /* tp_basicsize */ - 0, /* tp_itemsize */ - (destructor) PyTask_dealloc, /* tp_dealloc */ - 0, /* tp_print */ - 0, /* tp_getattr */ - 0, /* tp_setattr */ - 0, /* tp_compare */ - 0, /* tp_repr */ - 0, /* tp_as_number */ - 0, /* tp_as_sequence */ - 0, /* tp_as_mapping */ - 0, /* tp_hash */ - 0, /* tp_call */ - 0, /* tp_str */ - 0, /* tp_getattro */ - 0, /* tp_setattro */ - 0, /* tp_as_buffer */ - Py_TPFLAGS_DEFAULT, /* tp_flags */ - "Task object", /* tp_doc */ - 0, /* tp_traverse */ - 0, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - 0, /* tp_iter */ - 0, /* tp_iternext */ - PyTask_methods, /* tp_methods */ - 0, /* tp_members */ - 0, /* tp_getset */ - 0, /* tp_base */ - 0, /* tp_dict */ - 0, /* tp_descr_get */ - 0, /* tp_descr_set */ - 0, /* tp_dictoffset */ - (initproc) PyTask_init, /* tp_init */ - 0, /* tp_alloc */ - PyType_GenericNew, /* tp_new */ + PyVarObject_HEAD_INIT(NULL, 0) /* ob_size */ + "task.Task", /* tp_name */ + sizeof(PyTask), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor) PyTask_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + "Task object", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + PyTask_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc) PyTask_init, /* tp_init */ + 0, /* tp_alloc */ + PyType_GenericNew, /* tp_new */ }; /* Create a PyTask from a C struct. The resulting PyTask takes ownership of the @@ -358,6 +368,10 @@ PyObject *PyTask_make(task_spec *task_spec) { #define SIZE_LIMIT 100 #define NUM_ELEMENTS_LIMIT 1000 +#if PY_MAJOR_VERSION >= 3 +#define PyInt_Check PyLong_Check +#endif + /** * This method checks if a Python object is sufficiently simple that it can be * serialized and passed by value as an argument to a task (without being put in @@ -382,8 +396,8 @@ int is_simple_value(PyObject *value, int *num_elements_contained) { value == Py_True || PyFloat_Check(value) || value == Py_None) { return 1; } - if (PyString_CheckExact(value)) { - *num_elements_contained += PyString_Size(value); + if (PyBytes_CheckExact(value)) { + *num_elements_contained += PyBytes_Size(value); return (*num_elements_contained < NUM_ELEMENTS_LIMIT); } if (PyUnicode_CheckExact(value)) { @@ -391,7 +405,7 @@ int is_simple_value(PyObject *value, int *num_elements_contained) { return (*num_elements_contained < NUM_ELEMENTS_LIMIT); } if (PyList_CheckExact(value) && PyList_Size(value) < SIZE_LIMIT) { - for (size_t i = 0; i < PyList_Size(value); ++i) { + for (Py_ssize_t i = 0; i < PyList_Size(value); ++i) { if (!is_simple_value(PyList_GetItem(value, i), num_elements_contained)) { return 0; } @@ -410,7 +424,7 @@ int is_simple_value(PyObject *value, int *num_elements_contained) { return (*num_elements_contained < NUM_ELEMENTS_LIMIT); } if (PyTuple_CheckExact(value) && PyTuple_Size(value) < SIZE_LIMIT) { - for (size_t i = 0; i < PyTuple_Size(value); ++i) { + for (Py_ssize_t i = 0; i < PyTuple_Size(value); ++i) { if (!is_simple_value(PyTuple_GetItem(value, i), num_elements_contained)) { return 0; } diff --git a/src/common/lib/python/common_module.c b/src/common/lib/python/common_module.c index a1e6aea6c..1ee0e3f50 100644 --- a/src/common/lib/python/common_module.c +++ b/src/common/lib/python/common_module.c @@ -10,21 +10,53 @@ static PyMethodDef common_methods[] = { {NULL} /* Sentinel */ }; +#if PY_MAJOR_VERSION >= 3 +static struct PyModuleDef moduledef = { + PyModuleDef_HEAD_INIT, + "common", /* m_name */ + "A module for common types. This is used for testing.", /* m_doc */ + 0, /* m_size */ + common_methods, /* m_methods */ + NULL, /* m_reload */ + NULL, /* m_traverse */ + NULL, /* m_clear */ + NULL, /* m_free */ +}; +#endif + +#if PY_MAJOR_VERSION >= 3 +#define INITERROR return NULL +#else +#define INITERROR return +#endif + #ifndef PyMODINIT_FUNC /* declarations for DLL import/export */ #define PyMODINIT_FUNC void #endif -PyMODINIT_FUNC initcommon(void) { +#if PY_MAJOR_VERSION >= 3 +#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void) +#else +#define MOD_INIT(name) PyMODINIT_FUNC init##name(void) +#endif + +MOD_INIT(common) { PyObject *m; - if (PyType_Ready(&PyTaskType) < 0) - return; + if (PyType_Ready(&PyTaskType) < 0) { + INITERROR; + } - if (PyType_Ready(&PyObjectIDType) < 0) - return; + if (PyType_Ready(&PyObjectIDType) < 0) { + INITERROR; + } +#if PY_MAJOR_VERSION >= 3 + m = PyModule_Create(&moduledef); +#else m = Py_InitModule3("common", common_methods, "A module for common types. This is used for testing."); +#endif init_pickle_module(); @@ -38,4 +70,8 @@ PyMODINIT_FUNC initcommon(void) { CommonError = PyErr_NewException(common_error, NULL, NULL); Py_INCREF(CommonError); PyModule_AddObject(m, "common_error", CommonError); + +#if PY_MAJOR_VERSION >= 3 + return m; +#endif } diff --git a/src/common/redis_module/runtest.py b/src/common/redis_module/runtest.py index e293fa91d..0ff9fdce3 100644 --- a/src/common/redis_module/runtest.py +++ b/src/common/redis_module/runtest.py @@ -63,11 +63,11 @@ class TestGlobalStateStore(unittest.TestCase): self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id1") self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2") response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1") - self.assertEqual(set(response), {"manager_id1", "manager_id2"}) + self.assertEqual(set(response), {b"manager_id1", b"manager_id2"}) # Add a manager that already exists again and try again. self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2") response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1") - self.assertEqual(set(response), {"manager_id1", "manager_id2"}) + self.assertEqual(set(response), {b"manager_id1", b"manager_id2"}) def testObjectTableSubscribe(self): p = self.redis.pubsub() @@ -77,7 +77,7 @@ class TestGlobalStateStore(unittest.TestCase): # Receive the acknowledgement message. self.assertEqual(p.get_message()["data"], 1) # Receive the actual data. - self.assertEqual(p.get_message()["data"], "MANAGERS manager_id1") + self.assertEqual(p.get_message()["data"], b"MANAGERS manager_id1") def testResultTableAddAndLookup(self): response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id1") @@ -87,10 +87,10 @@ class TestGlobalStateStore(unittest.TestCase): self.assertEqual(set(response), set([])) self.redis.execute_command("RAY.RESULT_TABLE_ADD", "object_id1", "task_id1") response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id1") - self.assertEqual(response, "task_id1") + self.assertEqual(response, b"task_id1") self.redis.execute_command("RAY.RESULT_TABLE_ADD", "object_id2", "task_id2") response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id2") - self.assertEqual(response, "task_id2") + self.assertEqual(response, b"task_id2") if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/src/common/test/test.py b/src/common/test/test.py index e38efd6f3..d4683ffec 100644 --- a/src/common/test/test.py +++ b/src/common/test/test.py @@ -4,6 +4,7 @@ from __future__ import print_function import numpy as np import pickle +import sys import unittest import common @@ -20,10 +21,13 @@ def random_task_id(): return common.ObjectID(np.random.bytes(ID_SIZE)) BASE_SIMPLE_OBJECTS = [ - 0, 1, 100000, 0L, 1L, 100000L, 1L << 100, 0.0, 0.5, 0.9, 100000.1, (), [], {}, + 0, 1, 100000, 0.0, 0.5, 0.9, 100000.1, (), [], {}, "", 990 * "h", u"", 990 * u"h" ] +if sys.version_info < (3, 0): + BASE_SIMPLE_OBJECTS += [long(0), long(1), long(100000), long(1 << 100)] + LIST_SIMPLE_OBJECTS = [[obj] for obj in BASE_SIMPLE_OBJECTS] TUPLE_SIMPLE_OBJECTS = [(obj,) for obj in BASE_SIMPLE_OBJECTS] DICT_SIMPLE_OBJECTS = [{(): obj} for obj in BASE_SIMPLE_OBJECTS] @@ -85,16 +89,17 @@ class TestObjectID(unittest.TestCase): self.assertRaises(Exception, lambda : pickling.dumps(h)) def test_equality_comparisons(self): - x1 = common.ObjectID(ID_SIZE * "a") - x2 = common.ObjectID(ID_SIZE * "a") - y1 = common.ObjectID(ID_SIZE * "b") - y2 = common.ObjectID(ID_SIZE * "b") + x1 = common.ObjectID(ID_SIZE * b"a") + x2 = common.ObjectID(ID_SIZE * b"a") + y1 = common.ObjectID(ID_SIZE * b"b") + y2 = common.ObjectID(ID_SIZE * b"b") self.assertEqual(x1, x2) self.assertEqual(y1, y2) self.assertNotEqual(x1, y1) - object_ids1 = [common.ObjectID(ID_SIZE * chr(i)) for i in range(256)] - object_ids2 = [common.ObjectID(ID_SIZE * chr(i)) for i in range(256)] + random_strings = [np.random.bytes(ID_SIZE) for _ in range(256)] + object_ids1 = [common.ObjectID(random_strings[i]) for i in range(256)] + object_ids2 = [common.ObjectID(random_strings[i]) for i in range(256)] self.assertEqual(len(set(object_ids1)), 256) self.assertEqual(len(set(object_ids1 + object_ids2)), 256) self.assertEqual(set(object_ids1), set(object_ids2)) @@ -122,7 +127,7 @@ class TestTask(unittest.TestCase): 10 * ["a"], 100 * ["a"], 1000 * ["a"], - [1, 1.3, 2L, 1L << 100, "hi", u"hi", [1, 2]], + [1, 1.3, 2, 1 << 100, "hi", u"hi", [1, 2]], object_ids[:1], object_ids[:2], object_ids[:3], diff --git a/src/global_scheduler/test/test.py b/src/global_scheduler/test/test.py index 4cd16f677..05f393daa 100644 --- a/src/global_scheduler/test/test.py +++ b/src/global_scheduler/test/test.py @@ -102,7 +102,7 @@ class TestGlobalScheduler(unittest.TestCase): self.assertLessEqual(len(task_entries), 1) if len(task_entries) == 1: task_contents = self.redis_client.hgetall(task_entries[0]) - task_status = int(task_contents["state"]) + task_status = int(task_contents[b"state"]) self.assertTrue(task_status in [TASK_STATUS_WAITING, TASK_STATUS_SCHEDULED]) if task_status == TASK_STATUS_SCHEDULED: break @@ -120,7 +120,7 @@ class TestGlobalScheduler(unittest.TestCase): self.assertLessEqual(len(task_entries), num_tasks + 1) if len(task_entries) == num_tasks + 1: task_contents = [self.redis_client.hgetall(task_entries[i]) for i in range(len(task_entries))] - task_statuses = [int(contents["state"]) for contents in task_contents] + task_statuses = [int(contents[b"state"]) for contents in task_contents] self.assertTrue(all([status in [TASK_STATUS_WAITING, TASK_STATUS_SCHEDULED] for status in task_statuses])) if all([status == TASK_STATUS_SCHEDULED for status in task_statuses]): break diff --git a/src/photon/CMakeLists.txt b/src/photon/CMakeLists.txt index 168c1d2e4..f7141baf8 100644 --- a/src/photon/CMakeLists.txt +++ b/src/photon/CMakeLists.txt @@ -19,8 +19,9 @@ message(STATUS "PYTHON_INCLUDE_DIRS: " ${PYTHON_INCLUDE_DIRS}) execute_process(COMMAND ${CUSTOM_PYTHON_EXECUTABLE} -c "import sys; print(sys.exec_prefix)" OUTPUT_VARIABLE PYTHON_PREFIX OUTPUT_STRIP_TRAILING_WHITESPACE) message(STATUS "PYTHON_PREFIX: " ${PYTHON_PREFIX}) +# The name ending in "m" is for miniconda. FIND_LIBRARY(PYTHON_LIBRARIES - NAMES ${PYTHON_LIBRARY_NAME} + NAMES "${PYTHON_LIBRARY_NAME}" "${PYTHON_LIBRARY_NAME}m" HINTS "${PYTHON_PREFIX}" PATH_SUFFIXES "lib" "libs" NO_DEFAULT_PATH) @@ -29,8 +30,9 @@ message(STATUS "PYTHON_LIBRARIES: " ${PYTHON_LIBRARIES}) # the Python include directories. if(NOT PYTHON_LIBRARIES) message(STATUS "Failed to find PYTHON_LIBRARIES near the Python executable, so now looking near the Python include directories.") + # The name ending in "m" is for miniconda. FIND_LIBRARY(PYTHON_LIBRARIES - NAMES ${PYTHON_LIBRARY_NAME} + NAMES "${PYTHON_LIBRARY_NAME}" "${PYTHON_LIBRARY_NAME}m" HINTS "${PYTHON_INCLUDE_DIRS}/../.." PATH_SUFFIXES "lib" "libs" NO_DEFAULT_PATH) diff --git a/src/photon/photon/__init__.py b/src/photon/photon/__init__.py index 6a24ca4e0..c81d5021f 100644 --- a/src/photon/photon/__init__.py +++ b/src/photon/photon/__init__.py @@ -2,5 +2,5 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -from libphoton import * -from photon_services import * +from .libphoton import * +from .photon_services import * diff --git a/src/photon/photon_extension.c b/src/photon/photon_extension.c index 599743fba..23b8b3a49 100644 --- a/src/photon/photon_extension.c +++ b/src/photon/photon_extension.c @@ -73,7 +73,7 @@ static PyMethodDef PyPhotonClient_methods[] = { }; static PyTypeObject PyPhotonClientType = { - PyObject_HEAD_INIT(NULL) 0, /* ob_size */ + PyVarObject_HEAD_INIT(NULL, 0) /* ob_size */ "photon.PhotonClient", /* tp_name */ sizeof(PyPhotonClient), /* tp_basicsize */ 0, /* tp_itemsize */ @@ -121,24 +121,55 @@ static PyMethodDef photon_methods[] = { {NULL} /* Sentinel */ }; +#if PY_MAJOR_VERSION >= 3 +static struct PyModuleDef moduledef = { + PyModuleDef_HEAD_INIT, + "libphoton", /* m_name */ + "A module for the local scheduler.", /* m_doc */ + 0, /* m_size */ + photon_methods, /* m_methods */ + NULL, /* m_reload */ + NULL, /* m_traverse */ + NULL, /* m_clear */ + NULL, /* m_free */ +}; +#endif + +#if PY_MAJOR_VERSION >= 3 +#define INITERROR return NULL +#else +#define INITERROR return +#endif + #ifndef PyMODINIT_FUNC /* declarations for DLL import/export */ #define PyMODINIT_FUNC void #endif -PyMODINIT_FUNC initlibphoton(void) { - PyObject *m; +#if PY_MAJOR_VERSION >= 3 +#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void) +#else +#define MOD_INIT(name) PyMODINIT_FUNC init##name(void) +#endif - if (PyType_Ready(&PyTaskType) < 0) - return; +MOD_INIT(libphoton) { + if (PyType_Ready(&PyTaskType) < 0) { + INITERROR; + } - if (PyType_Ready(&PyObjectIDType) < 0) - return; + if (PyType_Ready(&PyObjectIDType) < 0) { + INITERROR; + } - if (PyType_Ready(&PyPhotonClientType) < 0) - return; + if (PyType_Ready(&PyPhotonClientType) < 0) { + INITERROR; + } - m = Py_InitModule3("libphoton", photon_methods, - "A module for the local scheduler."); +#if PY_MAJOR_VERSION >= 3 + PyObject *m = PyModule_Create(&moduledef); +#else + PyObject *m = Py_InitModule3("libphoton", photon_methods, + "A module for the local scheduler."); +#endif init_pickle_module(); @@ -155,4 +186,8 @@ PyMODINIT_FUNC initlibphoton(void) { PhotonError = PyErr_NewException(photon_error, NULL, NULL); Py_INCREF(PhotonError); PyModule_AddObject(m, "photon_error", PhotonError); + +#if PY_MAJOR_VERSION >= 3 + return m; +#endif } diff --git a/src/photon/test/test.py b/src/photon/test/test.py index e9d326d21..d624d4d79 100644 --- a/src/photon/test/test.py +++ b/src/photon/test/test.py @@ -74,7 +74,7 @@ class TestPhotonClient(unittest.TestCase): 10 * ["a"], 100 * ["a"], 1000 * ["a"], - [1, 1.3, 2L, 1L << 100, "hi", u"hi", [1, 2]], + [1, 1.3, 1 << 100, "hi", u"hi", [1, 2]], object_ids[:1], object_ids[:2], object_ids[:3], diff --git a/src/plasma/CMakeLists.txt b/src/plasma/CMakeLists.txt index 6c75c3017..1bff5fde4 100644 --- a/src/plasma/CMakeLists.txt +++ b/src/plasma/CMakeLists.txt @@ -19,8 +19,9 @@ message(STATUS "PYTHON_INCLUDE_DIRS: " ${PYTHON_INCLUDE_DIRS}) execute_process(COMMAND ${CUSTOM_PYTHON_EXECUTABLE} -c "import sys; print(sys.exec_prefix)" OUTPUT_VARIABLE PYTHON_PREFIX OUTPUT_STRIP_TRAILING_WHITESPACE) message(STATUS "PYTHON_PREFIX: " ${PYTHON_PREFIX}) +# The name ending in "m" is for miniconda. FIND_LIBRARY(PYTHON_LIBRARIES - NAMES ${PYTHON_LIBRARY_NAME} + NAMES "${PYTHON_LIBRARY_NAME}" "${PYTHON_LIBRARY_NAME}m" HINTS "${PYTHON_PREFIX}" PATH_SUFFIXES "lib" "libs" NO_DEFAULT_PATH) @@ -29,8 +30,9 @@ message(STATUS "PYTHON_LIBRARIES: " ${PYTHON_LIBRARIES}) # the Python include directories. if(NOT PYTHON_LIBRARIES) message(STATUS "Failed to find PYTHON_LIBRARIES near the Python executable, so now looking near the Python include directories.") + # The name ending in "m" is for miniconda. FIND_LIBRARY(PYTHON_LIBRARIES - NAMES ${PYTHON_LIBRARY_NAME} + NAMES "${PYTHON_LIBRARY_NAME}" "${PYTHON_LIBRARY_NAME}m" HINTS "${PYTHON_INCLUDE_DIRS}/../.." PATH_SUFFIXES "lib" "libs" NO_DEFAULT_PATH) @@ -89,4 +91,4 @@ endif(APPLE) target_link_libraries(plasma ${COMMON_LIB} ${PYTHON_LIBRARIES}) -install(TARGETS plasma DESTINATION ${CMAKE_SOURCE_DIR}/lib/python) +install(TARGETS plasma DESTINATION ${CMAKE_SOURCE_DIR}/plasma) diff --git a/src/plasma/plasma/__init__.py b/src/plasma/plasma/__init__.py new file mode 100644 index 000000000..6fe09d30f --- /dev/null +++ b/src/plasma/plasma/__init__.py @@ -0,0 +1,5 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from plasma.plasma import * diff --git a/src/plasma/lib/python/plasma.py b/src/plasma/plasma/plasma.py similarity index 96% rename from src/plasma/lib/python/plasma.py rename to src/plasma/plasma/plasma.py index 62d275de4..f7d66592c 100644 --- a/src/plasma/lib/python/plasma.py +++ b/src/plasma/plasma/plasma.py @@ -5,6 +5,7 @@ from __future__ import print_function import os import random import subprocess +import sys import time from . import libplasma @@ -42,13 +43,18 @@ class PlasmaBuffer(object): def __getitem__(self, index): """Read from the PlasmaBuffer as if it were just a regular buffer.""" - return self.buffer[index] + value = self.buffer[index] + if sys.version_info >= (3, 0) and not isinstance(index, slice): + value = chr(value) + return value def __setitem__(self, index, value): """Write to the PlasmaBuffer as if it were just a regular buffer. This should fail because the buffer should be read only. """ + if sys.version_info >= (3, 0) and not isinstance(index, slice): + value = ord(value) self.buffer[index] = value def __len__(self): @@ -103,7 +109,7 @@ class PlasmaClient(object): Exception: An exception is raised if the object could not be created. """ # Turn the metadata into the right type. - metadata = bytearray("") if metadata is None else metadata + metadata = bytearray(b"") if metadata is None else metadata buff = libplasma.create(self.conn, object_id, size, metadata) return PlasmaBuffer(buff, object_id, self) @@ -243,7 +249,7 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valg """ if use_valgrind and use_profiler: raise Exception("Cannot use valgrind and profiler at the same time.") - plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_store") + plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "plasma_store") plasma_store_name = "/tmp/plasma_store{}".format(random_name()) command = [plasma_store_executable, "-s", plasma_store_name, "-m", str(plasma_store_memory)] if use_valgrind: @@ -273,7 +279,7 @@ def start_plasma_manager(store_name, redis_address, num_retries=20, use_valgrind Raises: Exception: An exception is raised if the manager could not be started. """ - plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_manager") + plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "plasma_manager") plasma_manager_name = "/tmp/plasma_manager{}".format(random_name()) port = None process = None diff --git a/src/plasma/plasma_extension.c b/src/plasma/plasma_extension.c index 538bba8cd..727a53b2f 100644 --- a/src/plasma/plasma_extension.c +++ b/src/plasma/plasma_extension.c @@ -1,4 +1,5 @@ #include +#include "bytesobject.h" #include "common.h" #include "io.h" @@ -17,8 +18,8 @@ static int PyObjectToPlasmaConnection(PyObject *object, } static int PyObjectToUniqueID(PyObject *object, object_id *object_id) { - if (PyString_Check(object)) { - memcpy(&object_id->id[0], PyString_AsString(object), UNIQUE_ID_SIZE); + if (PyBytes_Check(object)) { + memcpy(&object_id->id[0], PyBytes_AsString(object), UNIQUE_ID_SIZE); return 1; } else { PyErr_SetString(PyExc_TypeError, "must be a 20 character string"); @@ -75,7 +76,12 @@ PyObject *PyPlasma_create(PyObject *self, PyObject *args) { "an object with this ID could not be created"); return NULL; } + +#if PY_MAJOR_VERSION >= 3 + return PyMemoryView_FromMemory((void *) data, (Py_ssize_t) size, PyBUF_WRITE); +#else return PyBuffer_FromReadWriteMemory((void *) data, (Py_ssize_t) size); +#endif } PyObject *PyPlasma_hash(PyObject *self, PyObject *args) { @@ -88,7 +94,8 @@ PyObject *PyPlasma_hash(PyObject *self, PyObject *args) { unsigned char digest[DIGEST_SIZE]; bool success = plasma_compute_object_hash(conn, object_id, digest); if (success) { - PyObject *digest_string = PyString_FromStringAndSize(digest, DIGEST_SIZE); + PyObject *digest_string = + PyBytes_FromStringAndSize((char *) digest, DIGEST_SIZE); return digest_string; } else { Py_RETURN_NONE; @@ -132,9 +139,22 @@ PyObject *PyPlasma_get(PyObject *self, PyObject *args) { plasma_get(conn, object_id, &size, &data, &metadata_size, &metadata); Py_END_ALLOW_THREADS; PyObject *t = PyTuple_New(2); +#if PY_MAJOR_VERSION >= 3 + PyTuple_SetItem(t, 0, PyMemoryView_FromMemory((void *) data, + (Py_ssize_t) size, PyBUF_READ)); +#else PyTuple_SetItem(t, 0, PyBuffer_FromMemory((void *) data, (Py_ssize_t) size)); - PyTuple_SetItem(t, 1, PyByteArray_FromStringAndSize( - (void *) metadata, (Py_ssize_t) metadata_size)); +#endif + +#if PY_MAJOR_VERSION >= 3 + PyTuple_SetItem( + t, 1, PyMemoryView_FromMemory((void *) metadata, + (Py_ssize_t) metadata_size, PyBUF_READ)); +#else + PyTuple_SetItem( + t, 1, PyBuffer_FromMemory((void *) metadata, (Py_ssize_t) metadata_size)); +#endif + return t; } @@ -233,8 +253,8 @@ PyObject *PyPlasma_wait(PyObject *self, PyObject *args) { if (object_requests[i].status == PLASMA_OBJECT_LOCAL || object_requests[i].status == PLASMA_OBJECT_REMOTE) { PyObject *ready = - PyString_FromStringAndSize((char *) object_requests[i].object_id.id, - sizeof(object_requests[i].object_id)); + PyBytes_FromStringAndSize((char *) object_requests[i].object_id.id, + sizeof(object_requests[i].object_id)); PyList_SetItem(ready_ids, num_returned, ready); PySet_Discard(waiting_ids, ready); num_returned += 1; @@ -258,7 +278,7 @@ PyObject *PyPlasma_evict(PyObject *self, PyObject *args) { return NULL; } int64_t evicted_bytes = plasma_evict(conn, (int64_t) num_bytes); - return PyInt_FromLong((long) evicted_bytes); + return PyLong_FromLong((long) evicted_bytes); } PyObject *PyPlasma_delete(PyObject *self, PyObject *args) { @@ -298,7 +318,7 @@ PyObject *PyPlasma_subscribe(PyObject *self, PyObject *args) { } int sock = plasma_subscribe(conn); - return PyInt_FromLong(sock); + return PyLong_FromLong(sock); } PyObject *PyPlasma_receive_notification(PyObject *self, PyObject *args) { @@ -320,10 +340,10 @@ PyObject *PyPlasma_receive_notification(PyObject *self, PyObject *args) { } /* Construct a tuple from object_info and return. */ PyObject *t = PyTuple_New(3); - PyTuple_SetItem(t, 0, PyString_FromStringAndSize( + PyTuple_SetItem(t, 0, PyBytes_FromStringAndSize( (char *) object_info.obj_id.id, UNIQUE_ID_SIZE)); - PyTuple_SetItem(t, 1, PyInt_FromLong(object_info.data_size)); - PyTuple_SetItem(t, 2, PyInt_FromLong(object_info.metadata_size)); + PyTuple_SetItem(t, 1, PyLong_FromLong(object_info.data_size)); + PyTuple_SetItem(t, 2, PyLong_FromLong(object_info.metadata_size)); return t; } @@ -346,7 +366,7 @@ static PyMethodDef plasma_methods[] = { {"evict", PyPlasma_evict, METH_VARARGS, "Evict some objects until we recover some number of bytes."}, {"release", PyPlasma_release, METH_VARARGS, "Release the plasma object."}, - {"delete", PyPlasma_delete, METH_VARARGS, "Deleta a plasma object."}, + {"delete", PyPlasma_delete, METH_VARARGS, "Delete a plasma object."}, {"transfer", PyPlasma_transfer, METH_VARARGS, "Transfer object to another plasma manager."}, {"subscribe", PyPlasma_subscribe, METH_VARARGS, @@ -356,11 +376,45 @@ static PyMethodDef plasma_methods[] = { {NULL} /* Sentinel */ }; +#if PY_MAJOR_VERSION >= 3 +static struct PyModuleDef moduledef = { + PyModuleDef_HEAD_INIT, + "libplasma", /* m_name */ + "A Python client library for plasma.", /* m_doc */ + 0, /* m_size */ + plasma_methods, /* m_methods */ + NULL, /* m_reload */ + NULL, /* m_traverse */ + NULL, /* m_clear */ + NULL, /* m_free */ +}; +#endif + +#if PY_MAJOR_VERSION >= 3 +#define INITERROR return NULL +#else +#define INITERROR return +#endif + #ifndef PyMODINIT_FUNC /* declarations for DLL import/export */ #define PyMODINIT_FUNC void #endif -PyMODINIT_FUNC initlibplasma(void) { - Py_InitModule3("libplasma", plasma_methods, - "A Python client library for plasma"); +#if PY_MAJOR_VERSION >= 3 +#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void) +#else +#define MOD_INIT(name) PyMODINIT_FUNC init##name(void) +#endif + +MOD_INIT(libplasma) { +#if PY_MAJOR_VERSION >= 3 + PyObject *m = PyModule_Create(&moduledef); +#else + PyObject *m = Py_InitModule3("libplasma", plasma_methods, + "A Python client library for plasma."); +#endif + +#if PY_MAJOR_VERSION >= 3 + return m; +#endif } diff --git a/src/plasma/lib/python/setup.py b/src/plasma/setup.py similarity index 60% rename from src/plasma/lib/python/setup.py rename to src/plasma/setup.py index 6c638be24..45627c03b 100644 --- a/src/plasma/lib/python/setup.py +++ b/src/plasma/setup.py @@ -9,9 +9,11 @@ import subprocess class install(_install.install): def run(self): - subprocess.check_call(["make"], cwd="../../") - subprocess.check_call(["cmake", ".."], cwd="../../build") - subprocess.check_call(["make", "install"], cwd="../../build") + subprocess.check_call(["make"]) + subprocess.check_call(["cp", "build/plasma_store", "plasma/plasma_store"]) + subprocess.check_call(["cp", "build/plasma_manager", "plasma/plasma_manager"]) + subprocess.check_call(["cmake", ".."], cwd="./build") + subprocess.check_call(["make", "install"], cwd="./build") # Calling _install.install.run(self) does not fetch required packages and # instead performs an old-style install. See command/install.py in # setuptools. So, calling do_egg_install() manually here. @@ -21,7 +23,10 @@ setup(name="Plasma", version="0.0.1", description="Plasma client for Python", packages=find_packages(), - package_data={"plasma": ["libplasma.so"]}, + package_data={"plasma": ["plasma_store", + "plasma_manager", + "libplasma.so"], + }, cmdclass={"install": install}, include_package_data=True, zip_safe=False) diff --git a/src/plasma/test/test.py b/src/plasma/test/test.py index 60b7939fb..9a7309f61 100644 --- a/src/plasma/test/test.py +++ b/src/plasma/test/test.py @@ -24,13 +24,13 @@ def random_object_id(): return np.random.bytes(20) def generate_metadata(length): - metadata = length * ["\x00"] + metadata_buffer = bytearray(length) if length > 0: - metadata[0] = chr(random.randint(0, 255)) - metadata[-1] = chr(random.randint(0, 255)) + metadata_buffer[0] = random.randint(0, 255) + metadata_buffer[-1] = random.randint(0, 255) for _ in range(100): - metadata[random.randint(0, length - 1)] = chr(random.randint(0, 255)) - return bytearray("".join(metadata)) + metadata_buffer[random.randint(0, length - 1)] = random.randint(0, 255) + return metadata_buffer def write_to_data_buffer(buff, length): if length > 0: @@ -123,7 +123,7 @@ class TestPlasmaClient(unittest.TestCase): metadata_buffer = self.plasma_client.get_metadata(object_id) self.assertEqual(len(metadata), len(metadata_buffer)) for i in range(len(metadata)): - self.assertEqual(metadata[i], metadata_buffer[i]) + self.assertEqual(chr(metadata[i]), metadata_buffer[i]) def test_create_existing(self): # This test is partially used to test the code path in which we create an diff --git a/test/array_test.py b/test/array_test.py index 7c77bd55e..c1c75c6ea 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -7,6 +7,10 @@ import ray import numpy as np import time from numpy.testing import assert_equal, assert_almost_equal +import sys + +if sys.version_info >= (3, 0): + from importlib import reload import ray.array.remote as ra import ray.array.distributed as da diff --git a/test/failure_test.py b/test/failure_test.py index 7ca31d813..a4f55cdc5 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -4,8 +4,12 @@ from __future__ import print_function import unittest import ray +import sys import time +if sys.version_info >= (3, 0): + from importlib import reload + import ray.test.test_functions as test_functions def wait_for_errors(error_type, num_errors, timeout=10): @@ -23,9 +27,9 @@ class FailureTest(unittest.TestCase): ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE) test_functions.test_unknown_type.remote() - wait_for_errors("TaskError", 1) + wait_for_errors(b"TaskError", 1) error_info = ray.error_info() - self.assertEqual(len(error_info["TaskError"]), 1) + self.assertEqual(len(error_info[b"TaskError"]), 1) ray.worker.cleanup() @@ -57,17 +61,17 @@ class TaskStatusTest(unittest.TestCase): test_functions.throw_exception_fct1.remote() test_functions.throw_exception_fct1.remote() - wait_for_errors("TaskError", 2) + wait_for_errors(b"TaskError", 2) result = ray.error_info() - self.assertEqual(len(result["TaskError"]), 2) - for task in result["TaskError"]: - self.assertTrue("Test function 1 intentionally failed." in task.get("message")) + self.assertEqual(len(result[b"TaskError"]), 2) + for task in result[b"TaskError"]: + self.assertTrue(b"Test function 1 intentionally failed." in task.get(b"message")) x = test_functions.throw_exception_fct2.remote() try: ray.get(x) except Exception as e: - self.assertTrue("Test function 2 intentionally failed."in str(e)) + self.assertTrue("Test function 2 intentionally failed." in str(e)) else: self.assertTrue(False) # ray.get should throw an exception @@ -76,7 +80,7 @@ class TaskStatusTest(unittest.TestCase): try: ray.get(ref) except Exception as e: - self.assertTrue("Test function 3 intentionally failed."in str(e)) + self.assertTrue("Test function 3 intentionally failed." in str(e)) else: self.assertTrue(False) # ray.get should throw an exception @@ -100,8 +104,8 @@ class TaskStatusTest(unittest.TestCase): def __call__(self): return ray.remote(Foo()) - wait_for_errors("RemoteFunctionImportError", 1) - self.assertTrue("There is a problem here." in ray.error_info()["RemoteFunctionImportError"][0]["message"]) + wait_for_errors(b"RemoteFunctionImportError", 1) + self.assertTrue(b"There is a problem here." in ray.error_info()[b"RemoteFunctionImportError"][0][b"message"]) ray.worker.cleanup() @@ -115,9 +119,9 @@ class TaskStatusTest(unittest.TestCase): raise Exception("The initializer failed.") return 0 ray.reusables.foo = ray.Reusable(initializer) - wait_for_errors("ReusableVariableImportError", 1) + wait_for_errors(b"ReusableVariableImportError", 1) # Check that the error message is in the task info. - self.assertTrue("The initializer failed." in ray.error_info()["ReusableVariableImportError"][0]["message"]) + self.assertTrue(b"The initializer failed." in ray.error_info()[b"ReusableVariableImportError"][0][b"message"]) ray.worker.cleanup() @@ -133,9 +137,9 @@ class TaskStatusTest(unittest.TestCase): def use_foo(): ray.reusables.foo use_foo.remote() - wait_for_errors("ReusableVariableReinitializeError", 1) + wait_for_errors(b"ReusableVariableReinitializeError", 1) # Check that the error message is in the task info. - self.assertTrue("The reinitializer failed." in ray.error_info()["ReusableVariableReinitializeError"][0]["message"]) + self.assertTrue(b"The reinitializer failed." in ray.error_info()[b"ReusableVariableReinitializeError"][0][b"message"]) ray.worker.cleanup() @@ -146,11 +150,11 @@ class TaskStatusTest(unittest.TestCase): if ray.worker.global_worker.mode == ray.WORKER_MODE: raise Exception("Function to run failed.") ray.worker.global_worker.run_function_on_all_workers(f) - wait_for_errors("FunctionToRunError", 2) + wait_for_errors(b"FunctionToRunError", 2) # Check that the error message is in the task info. - self.assertEqual(len(ray.error_info()["FunctionToRunError"]), 2) - self.assertTrue("Function to run failed." in ray.error_info()["FunctionToRunError"][0]["message"]) - self.assertTrue("Function to run failed." in ray.error_info()["FunctionToRunError"][1]["message"]) + self.assertEqual(len(ray.error_info()[b"FunctionToRunError"]), 2) + self.assertTrue(b"Function to run failed." in ray.error_info()[b"FunctionToRunError"][0][b"message"]) + self.assertTrue(b"Function to run failed." in ray.error_info()[b"FunctionToRunError"][1][b"message"]) ray.worker.cleanup() diff --git a/test/microbenchmarks.py b/test/microbenchmarks.py index 9082aa21e..061fb46da 100644 --- a/test/microbenchmarks.py +++ b/test/microbenchmarks.py @@ -4,9 +4,13 @@ from __future__ import print_function import unittest import ray +import sys import time import numpy as np +if sys.version_info >= (3, 0): + from importlib import reload + import ray.test.test_functions as test_functions class MicroBenchmarkTest(unittest.TestCase): diff --git a/test/runtest.py b/test/runtest.py index 59d3112b0..ee6a2acef 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -10,6 +10,9 @@ import string import sys from collections import namedtuple +if sys.version_info >= (3, 0): + from importlib import reload + import ray.test.test_functions as test_functions import ray.array.remote as ra import ray.array.distributed as da @@ -24,7 +27,7 @@ def assert_equal(obj1, obj2): np.testing.assert_equal(obj1, obj2) elif hasattr(obj1, "__dict__") and hasattr(obj2, "__dict__"): special_keys = ["_pytype_"] - assert set(obj1.__dict__.keys() + special_keys) == set(obj2.__dict__.keys() + special_keys), "Objects {} and {} are different.".format(obj1, obj2) + assert set(list(obj1.__dict__.keys()) + special_keys) == set(list(obj2.__dict__.keys()) + special_keys), "Objects {} and {} are different.".format(obj1, obj2) for key in obj1.__dict__.keys(): if key not in special_keys: assert_equal(obj1.__dict__[key], obj2.__dict__[key]) @@ -40,17 +43,25 @@ def assert_equal(obj1, obj2): assert len(obj1) == len(obj2), "Objects {} and {} are tuples with different lengths.".format(obj1, obj2) for i in range(len(obj1)): assert_equal(obj1[i], obj2[i]) + elif ray.serialization.is_named_tuple(type(obj1)) or ray.serialization.is_named_tuple(type(obj2)): + assert len(obj1) == len(obj2), "Objects {} and {} are named tuples with different lengths.".format(obj1, obj2) + for i in range(len(obj1)): + assert_equal(obj1[i], obj2[i]) else: assert obj1 == obj2, "Objects {} and {} are different.".format(obj1, obj2) -PRIMITIVE_OBJECTS = [0, 0.0, 0.9, 0L, 1L << 62, "a", string.printable, "\u262F", +if sys.version_info >= (3, 0): + long_extras = [0, np.array([["hi", u"hi"], [1.3, 1]])] +else: + long_extras = [long(0), np.array([["hi", u"hi"], [1.3, long(1)]])] + +PRIMITIVE_OBJECTS = [0, 0.0, 0.9, 1 << 62, "a", string.printable, "\u262F", u"hello world", u"\xff\xfe\x9c\x001\x000\x00", None, True, False, [], (), {}, np.int8(3), np.int32(4), np.int64(5), np.uint8(3), np.uint32(4), np.uint64(5), np.float32(1.9), np.float64(1.9), np.zeros([100, 100]), np.random.normal(size=[100, 100]), np.array(["hi", 3]), - np.array(["hi", 3], dtype=object), - np.array([["hi", u"hi"], [1.3, 1L]])] + np.array(["hi", 3], dtype=object)] + long_extras COMPLEX_OBJECTS = [[[[[[[[[[[[[]]]]]]]]]]]], {"obj{}".format(i): np.random.normal(size=[100, 100]) for i in range(10)}, @@ -299,7 +310,7 @@ class APITest(unittest.TestCase): print("Still using old definition of f, trying again.") # Test that we can close over plain old data. - data = [np.zeros([3, 5]), (1, 2, "a"), [0.0, 1.0, 2L], 2L, {"a": np.zeros(3)}] + data = [np.zeros([3, 5]), (1, 2, "a"), [0.0, 1.0, 1 << 62], 1 << 60, {"a": np.zeros(3)}] @ray.remote def g(): return data @@ -334,7 +345,7 @@ class APITest(unittest.TestCase): def testGetMultiple(self): ray.init(start_ray_local=True, num_workers=0) object_ids = [ray.put(i) for i in range(10)] - self.assertEqual(ray.get(object_ids), range(10)) + self.assertEqual(ray.get(object_ids), list(range(10))) ray.worker.cleanup() def testWait(self):