From bcd5af78c7d38caecf16ae9e824d61a84485e6c2 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Wed, 20 Feb 2019 22:29:25 -0800 Subject: [PATCH] Lint Cython files (#4097) --- .travis/format.sh | 4 +- .../cython/cython_examples/cython_blas.pyx | 12 +- .../cython/cython_examples/cython_simple.pyx | 9 +- python/ray/_raylet.pyx | 147 +++++++++++------- python/ray/includes/common.pxd | 27 +++- python/ray/includes/libraylet.pxd | 24 +-- python/ray/includes/ray_config.pxi | 12 +- python/ray/includes/task.pxd | 46 +++--- python/ray/includes/task.pxi | 65 +++++--- python/ray/includes/unique_ids.pxd | 12 +- python/ray/includes/unique_ids.pxi | 10 +- 11 files changed, 239 insertions(+), 129 deletions(-) diff --git a/.travis/format.sh b/.travis/format.sh index 9313e6410..be8ac2b91 100755 --- a/.travis/format.sh +++ b/.travis/format.sh @@ -48,13 +48,15 @@ format_changed() { # exist on both branches. MERGEBASE="$(git merge-base upstream/master HEAD)" - if ! git diff --diff-filter=ACM --quiet --exit-code "$MERGEBASE" -- '*.py' &>/dev/null; then + if ! git diff --diff-filter=ACM --quiet --exit-code "$MERGEBASE" -- '*.p*' &>/dev/null; then git diff --name-only --diff-filter=ACM "$MERGEBASE" -- '*.py' | xargs -P 5 \ yapf --in-place "${YAPF_EXCLUDES[@]}" "${YAPF_FLAGS[@]}" if which flake8 >/dev/null; then git diff --name-only --diff-filter=ACM "$MERGEBASE" -- '*.py' | xargs -P 5 \ flake8 --exclude=python/ray/core/generated/,doc/source/conf.py,python/ray/cloudpickle/ \ --ignore=C408,E121,E123,E126,E226,E24,E704,W503,W504,W605 + git diff --name-only --diff-filter=ACM "$MERGEBASE" -- '*.p*' | xargs -P 5 \ + flake8 --filename="*.pyx,*.pxd,*.pxi" --ignore=E211,E225,E226,E227,E999 fi fi diff --git a/examples/cython/cython_examples/cython_blas.pyx b/examples/cython/cython_examples/cython_blas.pyx index 7228b28e9..0f0e3819b 100644 --- a/examples/cython/cython_examples/cython_blas.pyx +++ b/examples/cython/cython_examples/cython_blas.pyx @@ -20,6 +20,7 @@ cimport scipy.linalg.cython_blas as blas + def compute_self_corr_for_voxel_sel(py_trans_a, py_trans_b, py_m, py_n, py_k, py_alpha, py_a, py_lda, int py_start_voxel, py_b, py_ldb, py_beta, py_c, py_ldc, @@ -116,7 +117,9 @@ def compute_self_corr_for_voxel_sel(py_trans_a, py_trans_b, py_m, py_n, py_k, cdef float[:, :, ::1] C C = py_c blas.sgemm(trans_a, trans_b, &M, &N, &K, &alpha, &A[0, 0], &lda, - &B[0, py_start_voxel], &ldb, &beta, &C[0, py_start_epoch, 0], &ldc) + &B[0, py_start_voxel], &ldb, &beta, + &C[0, py_start_epoch, 0], &ldc) + def compute_kernel_matrix(py_uplo, py_trans, py_n, py_k, py_alpha, py_a, int py_start_voxel, py_lda, @@ -209,6 +212,7 @@ def compute_kernel_matrix(py_uplo, py_trans, py_n, py_k, py_alpha, py_a, for k in range(j): py_c[k, j] = py_c[j, k] + def compute_single_self_corr_syrk(py_uplo, py_trans, py_n, py_k, py_alpha, py_a, py_lda, py_beta, py_c, py_ldc, @@ -298,6 +302,7 @@ def compute_single_self_corr_syrk(py_uplo, py_trans, py_n, py_k, for k in range(j): py_c[py_start_sample, k, j] = py_c[py_start_sample, j, k] + def compute_single_self_corr_gemm(py_trans_a, py_trans_b, py_m, py_n, py_k, py_alpha, py_a, py_lda, py_ldb, py_beta, py_c, py_ldc, @@ -388,6 +393,7 @@ def compute_single_self_corr_gemm(py_trans_a, py_trans_b, py_m, py_n, blas.sgemm(trans_a, trans_b, &M, &N, &K, &alpha, &A[0, 0], &lda, &A[0, 0], &ldb, &beta, &C[py_start_sample, 0, 0], &ldc) + def compute_corr_vectors(py_trans_a, py_trans_b, py_m, py_n, py_k, py_alpha, py_a, py_lda, py_b, py_ldb, py_beta, py_c, py_ldc, @@ -478,7 +484,9 @@ def compute_corr_vectors(py_trans_a, py_trans_b, py_m, py_n, cdef float[:, :, ::1] C C = py_c blas.sgemm(trans_a, trans_b, &M, &N, &K, &alpha, &A[0, 0], &lda, - &B[0, py_start_voxel], &ldb, &beta, &C[py_start_sample, 0, 0], &ldc) + &B[0, py_start_voxel], &ldb, &beta, + &C[py_start_sample, 0, 0], &ldc) + def compute_single_matrix_multiplication(py_trans_a, py_trans_b, py_m, py_n, py_k, py_alpha, py_a, py_lda, diff --git a/examples/cython/cython_examples/cython_simple.pyx b/examples/cython/cython_examples/cython_simple.pyx index 9ef0406ec..c02b98dea 100644 --- a/examples/cython/cython_examples/cython_simple.pyx +++ b/examples/cython/cython_examples/cython_simple.pyx @@ -1,36 +1,43 @@ #!python # cython: embedsignature=True, binding=True + def simple_func(x, y, z): return x + y + z + # Cython code directly callable from Python def fib(n): if n < 2: return n return fib(n-2) + fib(n-1) + # Typed Cython code def fib_int(int n): if n < 2: return n return fib_int(n-2) + fib_int(n-1) + # Cython-Python code cpdef fib_cpdef(int n): if n < 2: return n return fib_cpdef(n-2) + fib_cpdef(n-1) + # C code def fib_cdef(int n): return fib_in_c(n) + cdef int fib_in_c(int n): if n < 2: return n return fib_in_c(n-2) + fib_in_c(n-1) + # Simple class class simple_class(object): def __init__(self): @@ -39,5 +46,3 @@ class simple_class(object): def increment(self): self.value += 1 return self.value - - diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index ae1abfc88..b287e60c4 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3,6 +3,8 @@ # cython: embedsignature = True # cython: language_level = 3 +import numpy + from libc.stdint cimport int32_t, int64_t from libcpp cimport bool as c_bool from libcpp.memory cimport unique_ptr @@ -12,11 +14,6 @@ from libcpp.unordered_map cimport unordered_map from libcpp.vector cimport vector as c_vector from cython.operator import dereference, postincrement -cimport cpython - -include "includes/unique_ids.pxi" -include "includes/ray_config.pxi" -include "includes/task.pxi" from ray.includes.common cimport ( CLanguage, @@ -40,12 +37,17 @@ from ray.includes.task cimport CTaskSpecification from ray.includes.ray_config cimport RayConfig from ray.utils import decode +cimport cpython + +include "includes/unique_ids.pxi" +include "includes/ray_config.pxi" +include "includes/task.pxi" + if cpython.PY_MAJOR_VERSION >= 3: import pickle else: import cPickle as pickle -import numpy cdef int check_status(const CRayStatus& status) nogil except -1: @@ -83,7 +85,8 @@ cdef VectorToObjectIDs(c_vector[CObjectID] object_ids): def compute_put_id(TaskID task_id, int64_t put_index): if put_index < 1 or put_index > kMaxTaskPuts: - raise ValueError("The range of 'put_index' should be [1, %d]" % kMaxTaskPuts) + raise ValueError("The range of 'put_index' should be [1, %d]" + % kMaxTaskPuts) return ObjectID.from_native(ComputePutId(task_id.data, put_index)) @@ -97,42 +100,53 @@ cdef c_bool is_simple_value(value, int *num_elements_contained): if num_elements_contained[0] >= RayConfig.instance().num_elements_limit(): return False - if (cpython.PyInt_Check(value) or cpython.PyLong_Check(value) or value is False or - value is True or cpython.PyFloat_Check(value) or value is None): + if (cpython.PyInt_Check(value) or cpython.PyLong_Check(value) or + value is False or value is True or cpython.PyFloat_Check(value) or + value is None): return True if cpython.PyBytes_CheckExact(value): num_elements_contained[0] += cpython.PyBytes_Size(value) - return num_elements_contained[0] < RayConfig.instance().num_elements_limit() + return (num_elements_contained[0] < + RayConfig.instance().num_elements_limit()) if cpython.PyUnicode_CheckExact(value): num_elements_contained[0] += cpython.PyUnicode_GET_SIZE(value) - return num_elements_contained[0] < RayConfig.instance().num_elements_limit() + return (num_elements_contained[0] < + RayConfig.instance().num_elements_limit()) - if cpython.PyList_CheckExact(value) and cpython.PyList_Size(value) < RayConfig.instance().size_limit(): + if (cpython.PyList_CheckExact(value) and + cpython.PyList_Size(value) < RayConfig.instance().size_limit()): for item in value: if not is_simple_value(item, num_elements_contained): return False - return num_elements_contained[0] < RayConfig.instance().num_elements_limit() + return (num_elements_contained[0] < + RayConfig.instance().num_elements_limit()) - if cpython.PyDict_CheckExact(value) and cpython.PyDict_Size(value) < RayConfig.instance().size_limit(): + if (cpython.PyDict_CheckExact(value) and + cpython.PyDict_Size(value) < RayConfig.instance().size_limit()): # TODO(suquark): Using "items" in Python2 is not very efficient. for k, v in value.items(): - if not (is_simple_value(k, num_elements_contained) and is_simple_value(v, num_elements_contained)): + if not (is_simple_value(k, num_elements_contained) and + is_simple_value(v, num_elements_contained)): return False - return num_elements_contained[0] < RayConfig.instance().num_elements_limit() + return (num_elements_contained[0] < + RayConfig.instance().num_elements_limit()) - if cpython.PyTuple_CheckExact(value) and cpython.PyTuple_Size(value) < RayConfig.instance().size_limit(): + if (cpython.PyTuple_CheckExact(value) and + cpython.PyTuple_Size(value) < RayConfig.instance().size_limit()): for item in value: if not is_simple_value(item, num_elements_contained): return False - return num_elements_contained[0] < RayConfig.instance().num_elements_limit() + return (num_elements_contained[0] < + RayConfig.instance().num_elements_limit()) if isinstance(value, numpy.ndarray): if value.dtype == "O": return False num_elements_contained[0] += value.nbytes - return num_elements_contained[0] < RayConfig.instance().num_elements_limit() + return (num_elements_contained[0] < + RayConfig.instance().num_elements_limit()) return False @@ -140,12 +154,12 @@ cdef c_bool is_simple_value(value, int *num_elements_contained): def check_simple_value(value): """Check if value is simple enough to be send by value. - This method checks if a Python object is sufficiently simple that it can be - serialized and passed by value as an argument to a task (without being put in - the object store). The details of which objects are sufficiently simple are - defined by this method and are not particularly important. But for - performance reasons, it is better to place "small" objects in the task itself - and "large" objects in the object store. + This method checks if a Python object is sufficiently simple that it can + be serialized and passed by value as an argument to a task (without being + put in the object store). The details of which objects are sufficiently + simple are defined by this method and are not particularly important. + But for performance reasons, it is better to place "small" objects in + the task itself and "large" objects in the object store. Args: value: Python object that should be checked. @@ -160,6 +174,7 @@ def check_simple_value(value): cdef class Language: cdef CLanguage lang + def __cinit__(self, int32_t lang): self.lang = lang @@ -168,7 +183,8 @@ cdef class Language: return Language(lang) def __eq__(self, other): - return isinstance(other, Language) and (self.lang) == (other.lang) + return (isinstance(other, Language) and + (self.lang) == (other.lang)) def __repr__(self): if self.lang == LANGUAGE_PYTHON: @@ -187,7 +203,7 @@ cdef Language LANG_CPP = Language.from_native(LANGUAGE_CPP) cdef Language LANG_JAVA = Language.from_native(LANGUAGE_JAVA) -cdef unordered_map[c_string, double] resource_map_from_python_dict(resource_map): +cdef unordered_map[c_string, double] resource_map_from_dict(resource_map): cdef: unordered_map[c_string, double] out c_string resource_name @@ -200,20 +216,25 @@ cdef unordered_map[c_string, double] resource_map_from_python_dict(resource_map) cdef class RayletClient: cdef unique_ptr[CRayletClient] client + def __cinit__(self, raylet_socket, ClientID client_id, c_bool is_worker, DriverID driver_id): - # We know that we are using Python, so just skip the language parameter. + # We know that we are using Python, so just skip the language + # parameter. # TODO(suquark): Should we allow unicode chars in "raylet_socket"? - self.client.reset(new CRayletClient(raylet_socket.encode("ascii"), client_id.data, - is_worker, driver_id.data, LANGUAGE_PYTHON)) + self.client.reset(new CRayletClient( + raylet_socket.encode("ascii"), client_id.data, is_worker, + driver_id.data, LANGUAGE_PYTHON)) def disconnect(self): check_status(self.client.get().Disconnect()) def submit_task(self, Task task_spec): - check_status(self.client.get().SubmitTask(task_spec.execution_dependencies.get()[0], task_spec.task_spec.get()[0])) + check_status(self.client.get().SubmitTask( + task_spec.execution_dependencies.get()[0], + task_spec.task_spec.get()[0])) def get_task(self): cdef: @@ -227,9 +248,11 @@ cdef class RayletClient: check_status(self.client.get().TaskDone()) def fetch_or_reconstruct(self, object_ids, - c_bool fetch_only, TaskID current_task_id=TaskID.nil()): + c_bool fetch_only, + TaskID current_task_id=TaskID.nil()): cdef c_vector[CObjectID] fetch_ids = ObjectIDsToVector(object_ids) - check_status(self.client.get().FetchOrReconstruct(fetch_ids, fetch_only, current_task_id.data)) + check_status(self.client.get().FetchOrReconstruct( + fetch_ids, fetch_only, current_task_id.data)) def notify_unblocked(self, TaskID current_task_id): check_status(self.client.get().NotifyUnblocked(current_task_id.data)) @@ -240,14 +263,19 @@ cdef class RayletClient: WaitResultPair result c_vector[CObjectID] wait_ids wait_ids = ObjectIDsToVector(object_ids) - check_status(self.client.get().Wait(wait_ids, num_returns, timeout_milliseconds, - wait_local, current_task_id.data, &result)) - return VectorToObjectIDs(result.first), VectorToObjectIDs(result.second) + check_status(self.client.get().Wait(wait_ids, num_returns, + timeout_milliseconds, wait_local, + current_task_id.data, &result)) + return (VectorToObjectIDs(result.first), + VectorToObjectIDs(result.second)) def resource_ids(self): cdef: - ResourceMappingType resource_mapping = self.client.get().GetResourceIDs() - unordered_map[c_string, c_vector[pair[int64_t, double]]].iterator iterator = resource_mapping.begin() + ResourceMappingType resource_mapping = ( + self.client.get().GetResourceIDs()) + unordered_map[ + c_string, c_vector[pair[int64_t, double]] + ].iterator iterator = resource_mapping.begin() c_vector[pair[int64_t, double]] c_value resources_dict = {} while iterator != resource_mapping.end(): @@ -255,7 +283,8 @@ cdef class RayletClient: c_value = dereference(iterator).second ids_and_fractions = [] for i in range(c_value.size()): - ids_and_fractions.append((c_value[i].first, c_value[i].second)) + ids_and_fractions.append( + (c_value[i].first, c_value[i].second)) resources_dict[key] = ids_and_fractions postincrement(iterator) return resources_dict @@ -284,15 +313,18 @@ cdef class RayletClient: for py_profile_event in profile_data: profile_event = new GCSProfileEventT() if not isinstance(py_profile_event, dict): - raise TypeError("Incorrect type for a profile event. Expected dict instead of '%s'" % str(type(py_profile_event))) - # TODO(rkn): If the dictionary is formatted incorrectly, that could lead - # to errors. E.g., if any of the strings are empty, that will cause - # segfaults in the node manager. + raise TypeError( + "Incorrect type for a profile event. Expected dict " + "instead of '%s'" % str(type(py_profile_event))) + # TODO(rkn): If the dictionary is formatted incorrectly, that + # could lead to errors. E.g., if any of the strings are empty, + # that will cause segfaults in the node manager. for key_string, event_data in py_profile_event.items(): if key_string == "event_type": profile_event.event_type = event_data.encode("ascii") if profile_event.event_type.length() == 0: - raise ValueError("'event_type' should not be a null string.") + raise ValueError( + "'event_type' should not be a null string.") elif key_string == "start_time": profile_event.start_time = float(event_data) elif key_string == "end_time": @@ -300,13 +332,17 @@ cdef class RayletClient: elif key_string == "extra_data": profile_event.extra_data = event_data.encode("ascii") if profile_event.extra_data.length() == 0: - raise ValueError("'extra_data' should not be a null string.") + raise ValueError( + "'extra_data' should not be a null string.") else: - raise ValueError("Unknown profile event key '%s'" % key_string) - # Note that profile_info.profile_events is a vector of unique pointers, so - # profile_event will be deallocated when profile_info goes out of scope. - # "emplace_back" of vector has not been supported by Cython - profile_info.profile_events.push_back(unique_ptr[GCSProfileEventT](profile_event)) + raise ValueError( + "Unknown profile event key '%s'" % key_string) + # Note that profile_info.profile_events is a vector of unique + # pointers, so profile_event will be deallocated when profile_info + # goes out of scope. "emplace_back" of vector has not been + # supported by Cython + profile_info.profile_events.push_back( + unique_ptr[GCSProfileEventT](profile_event)) check_status(self.client.get().PushProfileEvents(profile_info)) @@ -316,11 +352,14 @@ cdef class RayletClient: def prepare_actor_checkpoint(self, ActorID actor_id): cdef CActorCheckpointID checkpoint_id - check_status(self.client.get().PrepareActorCheckpoint(actor_id.data, checkpoint_id)) - return ObjectID.from_native(checkpoint_id); + check_status(self.client.get().PrepareActorCheckpoint( + actor_id.data, checkpoint_id)) + return ObjectID.from_native(checkpoint_id) - def notify_actor_resumed_from_checkpoint(self, ActorID actor_id, ActorCheckpointID checkpoint_id): - check_status(self.client.get().NotifyActorResumedFromCheckpoint(actor_id.data, checkpoint_id.data)) + def notify_actor_resumed_from_checkpoint(self, ActorID actor_id, + ActorCheckpointID checkpoint_id): + check_status(self.client.get().NotifyActorResumedFromCheckpoint( + actor_id.data, checkpoint_id.data)) @property def language(self): diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index c25cab321..2fd9aaa37 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -14,29 +14,37 @@ from ray.includes.unique_ids cimport ( cdef extern from "ray/status.h" namespace "ray" nogil: cdef cppclass StatusCode: - pass + pass cdef cppclass CRayStatus "ray::Status": RayStatus() RayStatus(StatusCode code, const c_string &msg) - RayStatus(const CRayStatus &s); + RayStatus(const CRayStatus &s) @staticmethod CRayStatus OK() + @staticmethod CRayStatus OutOfMemory() + @staticmethod CRayStatus KeyError() + @staticmethod CRayStatus Invalid() + @staticmethod CRayStatus IOError() + @staticmethod CRayStatus TypeError() + @staticmethod CRayStatus UnknownError() + @staticmethod CRayStatus NotImplemented() + @staticmethod CRayStatus RedisError() @@ -75,12 +83,12 @@ cdef extern from "ray/status.h" namespace "ray::StatusCode" nogil: cdef extern from "ray/id.h" namespace "ray" nogil: const CTaskID FinishTaskId(const CTaskID &task_id) const CObjectID ComputeReturnId(const CTaskID &task_id, - int64_t return_index) + int64_t return_index) const CObjectID ComputePutId(const CTaskID &task_id, int64_t put_index) const CTaskID ComputeTaskId(const CObjectID &object_id) const CTaskID GenerateTaskId(const CDriverID &driver_id, - const CTaskID &parent_task_id, - int parent_task_counter) + const CTaskID &parent_task_id, + int parent_task_counter) int64_t ComputeObjectIndex(const CObjectID &object_id) @@ -92,18 +100,21 @@ cdef extern from "ray/gcs/format/gcs_generated.h" nogil: pass -# This is a workaround for C++ enum class since Cython has no corresponding representation. +# This is a workaround for C++ enum class since Cython has no corresponding +# representation. cdef extern from "ray/gcs/format/gcs_generated.h" namespace "Language" nogil: cdef CLanguage LANGUAGE_PYTHON "Language::PYTHON" cdef CLanguage LANGUAGE_CPP "Language::CPP" cdef CLanguage LANGUAGE_JAVA "Language::JAVA" -cdef extern from "ray/raylet/scheduling_resources.h" namespace "ray::raylet" nogil: +cdef extern from "ray/raylet/scheduling_resources.h" \ + namespace "ray::raylet" nogil: cdef cppclass ResourceSet "ResourceSet": ResourceSet() ResourceSet(const unordered_map[c_string, double] &resource_map) - ResourceSet(const c_vector[c_string] &resource_labels, const c_vector[double] resource_capacity) + ResourceSet(const c_vector[c_string] &resource_labels, + const c_vector[double] resource_capacity) c_bool operator==(const ResourceSet &rhs) const c_bool IsEqual(const ResourceSet &other) const c_bool IsSubset(const ResourceSet &other) const diff --git a/python/ray/includes/libraylet.pxd b/python/ray/includes/libraylet.pxd index 29e444a50..1a4ffb250 100644 --- a/python/ray/includes/libraylet.pxd +++ b/python/ray/includes/libraylet.pxd @@ -37,7 +37,8 @@ cdef extern from "ray/gcs/format/gcs_generated.h" nogil: GCSProfileTableDataT() -ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] ResourceMappingType +ctypedef unordered_map[c_string, c_vector[pair[int64_t, double]]] \ + ResourceMappingType ctypedef pair[c_vector[CObjectID], c_vector[CObjectID]] WaitResultPair @@ -48,22 +49,25 @@ cdef extern from "ray/raylet/raylet_client.h" nogil: c_bool is_worker, const CDriverID &driver_id, const CLanguage &language) CRayStatus Disconnect() - CRayStatus SubmitTask(const c_vector[CObjectID] &execution_dependencies, - const CTaskSpecification &task_spec) + CRayStatus SubmitTask( + const c_vector[CObjectID] &execution_dependencies, + const CTaskSpecification &task_spec) CRayStatus GetTask(unique_ptr[CTaskSpecification] *task_spec) CRayStatus TaskDone() CRayStatus FetchOrReconstruct(c_vector[CObjectID] &object_ids, - c_bool fetch_only, - const CTaskID ¤t_task_id) + c_bool fetch_only, + const CTaskID ¤t_task_id) CRayStatus NotifyUnblocked(const CTaskID ¤t_task_id) - CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_returns, - int64_t timeout_milliseconds, c_bool wait_local, - const CTaskID ¤t_task_id, WaitResultPair *result) + CRayStatus Wait(const c_vector[CObjectID] &object_ids, + int num_returns, int64_t timeout_milliseconds, + c_bool wait_local, const CTaskID ¤t_task_id, + WaitResultPair *result) CRayStatus PushError(const CDriverID &job_id, const c_string &type, const c_string &error_message, double timestamp) - CRayStatus PushProfileEvents(const GCSProfileTableDataT &profile_events) + CRayStatus PushProfileEvents( + const GCSProfileTableDataT &profile_events) CRayStatus FreeObjects(const c_vector[CObjectID] &object_ids, - c_bool local_only) + c_bool local_only) CRayStatus PrepareActorCheckpoint(const CActorID &actor_id, CActorCheckpointID &checkpoint_id) CRayStatus NotifyActorResumedFromCheckpoint( diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index 585cd32a6..6e16eaf03 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -27,7 +27,8 @@ cdef class Config: @staticmethod def initial_reconstruction_timeout_milliseconds(): - return RayConfig.instance().initial_reconstruction_timeout_milliseconds() + return (RayConfig.instance() + .initial_reconstruction_timeout_milliseconds()) @staticmethod def get_timeout_milliseconds(): @@ -59,11 +60,13 @@ cdef class Config: @staticmethod def local_scheduler_fetch_timeout_milliseconds(): - return RayConfig.instance().local_scheduler_fetch_timeout_milliseconds() + return (RayConfig.instance() + .local_scheduler_fetch_timeout_milliseconds()) @staticmethod def local_scheduler_reconstruction_timeout_milliseconds(): - return RayConfig.instance().local_scheduler_reconstruction_timeout_milliseconds() + return (RayConfig.instance() + .local_scheduler_reconstruction_timeout_milliseconds()) @staticmethod def max_num_to_reconstruct(): @@ -119,7 +122,8 @@ cdef class Config: @staticmethod def node_manager_forward_task_retry_timeout_milliseconds(): - return RayConfig.instance().node_manager_forward_task_retry_timeout_milliseconds() + return (RayConfig.instance() + .node_manager_forward_task_retry_timeout_milliseconds()) @staticmethod def object_manager_pull_timeout_ms(): diff --git a/python/ray/includes/task.pxd b/python/ray/includes/task.pxd index 9e487a2ce..128560f7b 100644 --- a/python/ray/includes/task.pxd +++ b/python/ray/includes/task.pxd @@ -18,10 +18,13 @@ from ray.includes.unique_ids cimport ( ) -cdef extern from "ray/raylet/task_execution_spec.h" namespace "ray::raylet" nogil: - cdef cppclass CTaskExecutionSpecification "ray::raylet::TaskExecutionSpecification": +cdef extern from "ray/raylet/task_execution_spec.h" \ + namespace "ray::raylet" nogil: + cdef cppclass CTaskExecutionSpecification \ + "ray::raylet::TaskExecutionSpecification": CTaskExecutionSpecification(const c_vector[CObjectID] &&dependencies) - CTaskExecutionSpecification(const c_vector[CObjectID] &&dependencies, int num_forwards) + CTaskExecutionSpecification( + const c_vector[CObjectID] &&dependencies, int num_forwards) c_vector[CObjectID] ExecutionDependencies() const void SetExecutionDependencies(const c_vector[CObjectID] &dependencies) int NumForwards() const @@ -34,31 +37,35 @@ cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil: cdef cppclass CTaskArgument "ray::raylet::TaskArgument": pass - cdef cppclass CTaskArgumentByReference "ray::raylet::TaskArgumentByReference": - CTaskArgumentByReference(const c_vector[CObjectID] &references); + cdef cppclass CTaskArgumentByReference \ + "ray::raylet::TaskArgumentByReference": + CTaskArgumentByReference(const c_vector[CObjectID] &references) cdef cppclass CTaskArgumentByValue "ray::raylet::TaskArgumentByValue": - CTaskArgumentByValue(const uint8_t *value, size_t length); + CTaskArgumentByValue(const uint8_t *value, size_t length) cdef cppclass CTaskSpecification "ray::raylet::TaskSpecification": - CTaskSpecification(const CDriverID &driver_id, const CTaskID &parent_task_id, - int64_t parent_counter, - const c_vector[shared_ptr[CTaskArgument]] &task_arguments, - int64_t num_returns, - const unordered_map[c_string, double] &required_resources, - const CLanguage &language, - const c_vector[c_string] &function_descriptor) CTaskSpecification( - const CDriverID &driver_id, const CTaskID &parent_task_id, int64_t parent_counter, - const CActorID &actor_creation_id, const CObjectID &actor_creation_dummy_object_id, + const CDriverID &driver_id, const CTaskID &parent_task_id, + int64_t parent_counter, + const c_vector[shared_ptr[CTaskArgument]] &task_arguments, + int64_t num_returns, + const unordered_map[c_string, double] &required_resources, + const CLanguage &language, + const c_vector[c_string] &function_descriptor) + CTaskSpecification( + const CDriverID &driver_id, const CTaskID &parent_task_id, + int64_t parent_counter, const CActorID &actor_creation_id, + const CObjectID &actor_creation_dummy_object_id, int64_t max_actor_reconstructions, const CActorID &actor_id, const CActorHandleID &actor_handle_id, int64_t actor_counter, const c_vector[CActorHandleID] &new_actor_handles, const c_vector[shared_ptr[CTaskArgument]] &task_arguments, int64_t num_returns, const unordered_map[c_string, double] &required_resources, - const unordered_map[c_string, double] &required_placement_resources, - const CLanguage &language, const c_vector[c_string] &function_descriptor) + const unordered_map[c_string, double] &required_placement_res, + const CLanguage &language, + const c_vector[c_string] &function_descriptor) CTaskSpecification(const c_string &string) c_string SerializeAsString() const @@ -97,7 +104,7 @@ cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil: cdef extern from "ray/raylet/task.h" namespace "ray::raylet" nogil: cdef cppclass CTask "ray::raylet::Task": CTask(const CTaskExecutionSpecification &execution_spec, - const CTaskSpecification &task_spec) + const CTaskSpecification &task_spec) const CTaskExecutionSpecification &GetTaskExecutionSpec() const const CTaskSpecification &GetTaskSpecification() const void SetExecutionDependencies(const c_vector[CObjectID] &dependencies) @@ -105,5 +112,6 @@ cdef extern from "ray/raylet/task.h" namespace "ray::raylet" nogil: const c_vector[CObjectID] &GetDependencies() const void CopyTaskExecutionSpec(const CTask &task) - cdef c_string SerializeTaskAsString(const c_vector[CObjectID] *dependencies, + cdef c_string SerializeTaskAsString( + const c_vector[CObjectID] *dependencies, const CTaskSpecification *task_spec) diff --git a/python/ray/includes/task.pxi b/python/ray/includes/task.pxi index 32d878ae1..9ba33908d 100644 --- a/python/ray/includes/task.pxi +++ b/python/ray/includes/task.pxi @@ -37,42 +37,57 @@ cdef class Task: for item in function_descriptor: if not isinstance(item, bytes): - raise TypeError("'function_descriptor' takes a list of byte strings.") + raise TypeError( + "'function_descriptor' takes a list of byte strings.") c_function_descriptor.push_back(item) # Parse the resource map. if resource_map is not None: - required_resources = resource_map_from_python_dict(resource_map) + required_resources = resource_map_from_dict(resource_map) if required_resources.count(b"CPU") == 0: required_resources[b"CPU"] = 1.0 if placement_resource_map is not None: - required_placement_resources = resource_map_from_python_dict(placement_resource_map) + required_placement_resources = ( + resource_map_from_dict(placement_resource_map)) # Parse the arguments from the list. for arg in arguments: if isinstance(arg, ObjectID): references = c_vector[CObjectID]() references.push_back((arg).data) - task_args.push_back(static_pointer_cast[CTaskArgument, CTaskArgumentByReference](make_shared[CTaskArgumentByReference](references))) + task_args.push_back( + static_pointer_cast[CTaskArgument, + CTaskArgumentByReference]( + make_shared[CTaskArgumentByReference](references))) else: - pickled_str = pickle.dumps(arg, protocol=pickle.HIGHEST_PROTOCOL) - task_args.push_back(static_pointer_cast[CTaskArgument, CTaskArgumentByValue](make_shared[CTaskArgumentByValue](pickled_str.c_str(), pickled_str.size()))) + pickled_str = pickle.dumps( + arg, protocol=pickle.HIGHEST_PROTOCOL) + task_args.push_back( + static_pointer_cast[CTaskArgument, + CTaskArgumentByValue]( + make_shared[CTaskArgumentByValue]( + pickled_str.c_str(), + pickled_str.size()))) for new_actor_handle in new_actor_handles: - task_new_actor_handles.push_back((new_actor_handle).data) + task_new_actor_handles.push_back( + (new_actor_handle).data) self.task_spec.reset(new CTaskSpecification( - CUniqueID(driver_id.data), parent_task_id.data, parent_counter, actor_creation_id.data, - actor_creation_dummy_object_id.data, max_actor_reconstructions, CUniqueID(actor_id.data), - CUniqueID(actor_handle_id.data), actor_counter, task_new_actor_handles, task_args, num_returns, - required_resources, required_placement_resources, LANGUAGE_PYTHON, - c_function_descriptor)) + CUniqueID(driver_id.data), parent_task_id.data, parent_counter, + actor_creation_id.data, actor_creation_dummy_object_id.data, + max_actor_reconstructions, CUniqueID(actor_id.data), + CUniqueID(actor_handle_id.data), actor_counter, + task_new_actor_handles, task_args, num_returns, + required_resources, required_placement_resources, + LANGUAGE_PYTHON, c_function_descriptor)) # Set the task's execution dependencies. self.execution_dependencies.reset(new c_vector[CObjectID]()) if execution_arguments is not None: for execution_arg in execution_arguments: - self.execution_dependencies.get().push_back((execution_arg).data) + self.execution_dependencies.get().push_back( + (execution_arg).data) @staticmethod cdef make(unique_ptr[CTaskSpecification]& task_spec): @@ -108,7 +123,8 @@ cdef class Task: return self.task_spec.get().SerializeAsString() def _serialized_raylet_task(self): - return SerializeTaskAsString(self.execution_dependencies.get(), self.task_spec.get()) + return SerializeTaskAsString( + self.execution_dependencies.get(), self.task_spec.get()) def driver_id(self): """Return the driver ID for this task.""" @@ -128,7 +144,8 @@ cdef class Task: def function_descriptor_list(self): """Return the function descriptor for this task.""" - cdef c_vector[c_string] function_descriptor = self.task_spec.get().FunctionDescriptor() + cdef c_vector[c_string] function_descriptor = ( + self.task_spec.get().FunctionDescriptor()) results = [] for i in range(function_descriptor.size()): results.append(function_descriptor[i]) @@ -148,9 +165,11 @@ cdef class Task: count = task_spec.ArgIdCount(i) if count > 0: assert count == 1 - arg_list.append(ObjectID.from_native(task_spec.ArgId(i, 0))) + arg_list.append( + ObjectID.from_native(task_spec.ArgId(i, 0))) else: - serialized_str = task_spec.ArgVal(i)[:task_spec.ArgValLength(i)] + serialized_str = ( + task_spec.ArgVal(i)[:task_spec.ArgValLength(i)]) obj = pickle.loads(serialized_str) arg_list.append(obj) elif lang == LANGUAGE_JAVA: @@ -169,15 +188,18 @@ cdef class Task: def required_resources(self): """Return the resource dictionary of the task.""" cdef: - unordered_map[c_string, double] resource_map = self.task_spec.get().GetRequiredResources().GetResourceMap() + unordered_map[c_string, double] resource_map = ( + self.task_spec.get().GetRequiredResources().GetResourceMap()) c_string resource_name double resource_value - unordered_map[c_string, double].iterator iterator = resource_map.begin() + unordered_map[c_string, double].iterator iterator = ( + resource_map.begin()) required_resources = {} while iterator != resource_map.end(): resource_name = dereference(iterator).first - py_resource_name = str(resource_name) # bytes for Py2, unicode for Py3 + # bytes for Py2, unicode for Py3 + py_resource_name = str(resource_name) resource_value = dereference(iterator).second required_resources[py_resource_name] = resource_value postincrement(iterator) @@ -193,7 +215,8 @@ cdef class Task: def actor_creation_dummy_object_id(self): """Return the actor creation dummy object ID for the task.""" - return ObjectID.from_native(self.task_spec.get().ActorCreationDummyObjectId()) + return ObjectID.from_native( + self.task_spec.get().ActorCreationDummyObjectId()) def actor_id(self): """Return the actor ID for this task.""" diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index da4abb3ac..fc36f9776 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -6,21 +6,25 @@ cdef extern from "ray/id.h" namespace "ray" nogil: cdef cppclass CUniqueID "ray::UniqueID": CUniqueID() CUniqueID(const CUniqueID &from_id) + @staticmethod CUniqueID from_random() + @staticmethod CUniqueID from_binary(const c_string & binary) + @staticmethod const CUniqueID nil() + size_t hash() const c_bool is_nil() const c_bool operator==(const CUniqueID& rhs) const c_bool operator!=(const CUniqueID& rhs) const const uint8_t *data() const - uint8_t *mutable_data(); - size_t size() const; - c_string binary() const; - c_string hex() const; + uint8_t *mutable_data() + size_t size() const + c_string binary() const + c_string hex() const ctypedef CUniqueID CActorCheckpointID ctypedef CUniqueID CActorClassID diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index be0bdd3f5..358d67b3e 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -32,7 +32,8 @@ def check_id(b): if not isinstance(b, bytes): raise TypeError("Unsupported type: " + str(type(b))) if len(b) != kUniqueIDSize: - raise ValueError("ID string needs to have length " + str(kUniqueIDSize)) + raise ValueError("ID string needs to have length " + + str(kUniqueIDSize)) cdef extern from "ray/constants.h" nogil: @@ -106,9 +107,10 @@ cdef class UniqueID: return type(self), (self.binary(),) def redis_shard_hash(self): - # NOTE: The hash function used here must match the one in GetRedisContext in - # src/ray/gcs/tables.h. Changes to the hash function should only be made - # through std::hash in src/common/common.h + # NOTE: The hash function used here must match the one in + # GetRedisContext in src/ray/gcs/tables.h. Changes to the + # hash function should only be made through std::hash in + # src/common/common.h return self.data.hash()