From fd835d107e938e29be1e55e15d3f425f6cb01f51 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Thu, 11 Jul 2019 17:07:04 +0800 Subject: [PATCH] Move task to common module and add checks in getter methods (#5147) --- BUILD.bazel | 71 +++++----- python/ray/includes/common.pxd | 4 +- python/ray/includes/task.pxd | 30 ++--- python/ray/includes/task.pxi | 26 +++- python/ray/worker.py | 19 ++- src/ray/{rpc/util.h => common/grpc_util.h} | 39 +++++- .../task}/scheduling_resources.cc | 4 - .../task}/scheduling_resources.h | 14 +- src/ray/{raylet => common/task}/task.cc | 11 +- src/ray/{raylet => common/task}/task.h | 20 ++- src/ray/common/task/task_common.h | 20 +++ .../task}/task_execution_spec.cc | 15 ++- .../task}/task_execution_spec.h | 19 +-- src/ray/{raylet => common/task}/task_spec.cc | 121 +++++++++--------- src/ray/{raylet => common/task}/task_spec.h | 43 +++---- src/ray/{raylet => common/task}/task_test.cc | 6 +- src/ray/{raylet => common/task}/task_util.h | 13 +- src/ray/core_worker/common.h | 14 +- src/ray/core_worker/context.cc | 6 +- src/ray/core_worker/context.h | 4 +- src/ray/core_worker/core_worker.cc | 2 +- src/ray/core_worker/core_worker.h | 8 +- src/ray/core_worker/task_execution.cc | 3 +- src/ray/core_worker/task_execution.h | 2 +- src/ray/core_worker/task_interface.cc | 6 +- src/ray/core_worker/task_interface.h | 12 +- .../core_worker/transport/raylet_transport.cc | 6 +- src/ray/core_worker/transport/transport.h | 4 +- src/ray/gcs/tables.cc | 2 +- src/ray/raylet/actor_registration.h | 2 +- ...org_ray_runtime_raylet_RayletClientImpl.cc | 4 +- src/ray/raylet/lineage_cache.h | 2 +- src/ray/raylet/lineage_cache_test.cc | 8 +- src/ray/raylet/main.cc | 11 +- src/ray/raylet/node_manager.cc | 19 +-- src/ray/raylet/node_manager.h | 9 +- src/ray/raylet/raylet.h | 2 +- src/ray/raylet/raylet_client.cc | 11 +- src/ray/raylet/raylet_client.h | 8 +- src/ray/raylet/scheduling_policy.h | 2 +- src/ray/raylet/scheduling_queue.cc | 8 +- src/ray/raylet/scheduling_queue.h | 4 +- src/ray/raylet/task_dependency_manager.h | 2 +- .../raylet/task_dependency_manager_test.cc | 2 +- src/ray/raylet/worker.h | 6 +- src/ray/raylet/worker_pool.cc | 2 +- src/ray/raylet/worker_pool.h | 6 +- src/ray/raylet/worker_pool_test.cc | 6 +- src/ray/rpc/client_call.h | 2 +- src/ray/rpc/message_wrapper.h | 45 ------- src/ray/rpc/server_call.h | 2 +- 51 files changed, 346 insertions(+), 361 deletions(-) rename src/ray/{rpc/util.h => common/grpc_util.h} (64%) rename src/ray/{raylet => common/task}/scheduling_resources.cc (99%) rename src/ray/{raylet => common/task}/scheduling_resources.h (98%) rename src/ray/{raylet => common/task}/task.cc (81%) rename src/ray/{raylet => common/task}/task.h (88%) create mode 100644 src/ray/common/task/task_common.h rename src/ray/{raylet => common/task}/task_execution_spec.cc (57%) rename src/ray/{raylet => common/task}/task_execution_spec.h (80%) rename src/ray/{raylet => common/task}/task_spec.cc (68%) rename src/ray/{raylet => common/task}/task_spec.h (87%) rename src/ray/{raylet => common/task}/task_test.cc (95%) rename src/ray/{raylet => common/task}/task_util.h (95%) delete mode 100644 src/ray/rpc/message_wrapper.h diff --git a/BUILD.bazel b/BUILD.bazel index c9ecaad87..95fd33a52 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -183,6 +183,42 @@ cc_library( # === End of rpc definitions === +cc_library( + name = "ray_common", + srcs = glob( + [ + "src/ray/common/**/*.cc", + ], + exclude = [ + "src/ray/common/**/*_test.cc", + ], + ), + hdrs = glob( + [ + "src/ray/common/**/*.h", + ], + ), + copts = COPTS, + deps = [ + ":common_cc_proto", + ":node_manager_fbs", + ":ray_util", + "@boost//:asio", + "@com_github_grpc_grpc//:grpc++", + "@plasma//:plasma_client", + ], +) + +cc_test( + name = "common_test", + srcs = glob(["src/ray/common/**/*_test.cc"]), + copts = COPTS, + deps = [ + ":ray_common", + "@com_google_googletest//:gtest_main", + ], +) + cc_binary( name = "raylet", srcs = ["src/ray/raylet/main.cc"], @@ -307,6 +343,8 @@ cc_library( ":core_worker_cc_proto", ":ray_common", ":ray_util", + # TODO(hchen): After `raylet_client` is migrated to gRPC, `core_worker_lib` + # should only depend on `raylet_client`, instead of the whole `raylet_lib`. ":raylet_lib", ":worker_rpc", ], @@ -387,16 +425,6 @@ cc_test( ], ) -cc_test( - name = "task_test", - srcs = ["src/ray/raylet/task_test.cc"], - copts = COPTS, - deps = [ - ":raylet_lib", - "@com_google_googletest//:gtest_main", - ], -) - cc_test( name = "client_connection_test", srcs = ["src/ray/raylet/client_connection_test.cc"], @@ -495,29 +523,6 @@ cc_library( ], ) -cc_library( - name = "ray_common", - srcs = glob( - [ - "src/ray/common/*.cc", - ], - exclude = [ - "src/ray/common/*_test.cc", - ], - ), - hdrs = glob( - [ - "src/ray/common/*.h", - ], - ), - copts = COPTS, - deps = [ - ":ray_util", - "@boost//:asio", - "@plasma//:plasma_client", - ], -) - cc_library( name = "sha256", srcs = [ diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index c9b95aaec..18a248304 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -100,8 +100,8 @@ cdef extern from "ray/protobuf/common.pb.h" namespace "Language" nogil: cdef CLanguage LANGUAGE_JAVA "Language::JAVA" -cdef extern from "ray/raylet/scheduling_resources.h" \ - namespace "ray::raylet" nogil: +cdef extern from "ray/common/task/scheduling_resources.h" \ + namespace "ray" nogil: cdef cppclass ResourceSet "ResourceSet": ResourceSet() ResourceSet(const unordered_map[c_string, double] &resource_map) diff --git a/python/ray/includes/task.pxd b/python/ray/includes/task.pxd index b648c325d..95a1b06ce 100644 --- a/python/ray/includes/task.pxd +++ b/python/ray/includes/task.pxd @@ -35,8 +35,8 @@ cdef extern from "ray/protobuf/gcs.pb.h" namespace "ray::rpc" nogil: const c_string &SerializeAsString() -cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil: - cdef cppclass CTaskSpec "ray::raylet::TaskSpecification": +cdef extern from "ray/common/task/task_spec.h" namespace "ray" nogil: + cdef cppclass CTaskSpec "ray::TaskSpecification": CTaskSpec(const RpcTaskSpec message) CTaskSpec(const c_string &serialized_binary) const RpcTaskSpec &GetMessage() @@ -61,7 +61,7 @@ cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil: const ResourceSet GetRequiredPlacementResources() const c_bool IsDriverTask() const CLanguage GetLanguage() const - + c_bool IsNormalTask() const c_bool IsActorCreationTask() const c_bool IsActorTask() const CActorID ActorCreationId() const @@ -74,38 +74,38 @@ cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil: c_vector[CActorHandleID] NewActorHandles() const -cdef extern from "ray/raylet/task_util.h" namespace "ray::raylet" nogil: - cdef cppclass TaskSpecBuilder "ray::raylet::TaskSpecBuilder": +cdef extern from "ray/common/task/task_util.h" namespace "ray" nogil: + cdef cppclass TaskSpecBuilder "ray::TaskSpecBuilder": TaskSpecBuilder &SetCommonTaskSpec( const CLanguage &language, const c_vector[c_string] &function_descriptor, const CJobID &job_id, const CTaskID &parent_task_id, uint64_t parent_counter, uint64_t num_returns, const unordered_map[c_string, double] &required_resources, - const unordered_map[c_string, double] &required_placement_resources); + const unordered_map[c_string, double] &required_placement_resources) - TaskSpecBuilder &AddByRefArg(const CObjectID &arg_id); + TaskSpecBuilder &AddByRefArg(const CObjectID &arg_id) - TaskSpecBuilder &AddByValueArg(const c_string &data); + TaskSpecBuilder &AddByValueArg(const c_string &data) TaskSpecBuilder &SetActorCreationTaskSpec( const CActorID &actor_id, uint64_t max_reconstructions, - const c_vector[c_string] &dynamic_worker_options); + const c_vector[c_string] &dynamic_worker_options) TaskSpecBuilder &SetActorTaskSpec( const CActorID &actor_id, const CActorHandleID &actor_handle_id, const CObjectID &actor_creation_dummy_object_id, uint64_t actor_counter, - const c_vector[CActorHandleID] &new_handle_ids); + const c_vector[CActorHandleID] &new_handle_ids) - RpcTaskSpec GetMessage(); + RpcTaskSpec GetMessage() -cdef extern from "ray/raylet/task_execution_spec.h" namespace "ray::raylet" nogil: - cdef cppclass CTaskExecutionSpec "ray::raylet::TaskExecutionSpecification": +cdef extern from "ray/common/task/task_execution_spec.h" namespace "ray" nogil: + cdef cppclass CTaskExecutionSpec "ray::TaskExecutionSpecification": CTaskExecutionSpec(RpcTaskExecutionSpec message) CTaskExecutionSpec(const c_string &serialized_binary) const RpcTaskExecutionSpec &GetMessage() c_vector[CObjectID] ExecutionDependencies() uint64_t NumForwards() -cdef extern from "ray/raylet/task.h" namespace "ray::raylet" nogil: - cdef cppclass CTask "ray::raylet::Task": +cdef extern from "ray/common/task/task.h" namespace "ray" nogil: + cdef cppclass CTask "ray::Task": CTask(CTaskSpec task_spec, CTaskExecutionSpec task_execution_spec) diff --git a/python/ray/includes/task.pxi b/python/ray/includes/task.pxi index 42838b08a..dfad99205 100644 --- a/python/ray/includes/task.pxi +++ b/python/ray/includes/task.pxi @@ -15,7 +15,7 @@ from ray.includes.task cimport ( cdef class TaskSpec: - """Cython wrapper class of C++ `ray::raylet::TaskSpecification`.""" + """Cython wrapper class of C++ `ray::TaskSpecification`.""" cdef: unique_ptr[CTaskSpec] task_spec @@ -121,6 +121,18 @@ cdef class TaskSpec: """ return self.task_spec.get().Serialize() + def is_normal_task(self): + """Whether this task is a normal task.""" + return self.task_spec.get().IsNormalTask() + + def is_actor_task(self): + """Whether this task is an actor task.""" + return self.task_spec.get().IsActorTask() + + def is_actor_creation_task(self): + """Whether this task is an actor creation task.""" + return self.task_spec.get().IsActorCreationTask() + def job_id(self): """Return the job ID for this task.""" return JobID(self.task_spec.get().JobId().Binary()) @@ -206,24 +218,32 @@ cdef class TaskSpec: def actor_creation_id(self): """Return the actor creation ID for the task.""" + if not self.is_actor_creation_task(): + return ActorID.nil() return ActorID(self.task_spec.get().ActorCreationId().Binary()) def actor_creation_dummy_object_id(self): """Return the actor creation dummy object ID for the task.""" + if not self.is_actor_task(): + return ObjectID.nil() return ObjectID( self.task_spec.get().ActorCreationDummyObjectId().Binary()) def actor_id(self): """Return the actor ID for this task.""" + if not self.is_actor_task(): + return ActorID.nil() return ActorID(self.task_spec.get().ActorId().Binary()) def actor_counter(self): """Return the actor counter for this task.""" + if not self.is_actor_task(): + return 0 return self.task_spec.get().ActorCounter() cdef class TaskExecutionSpec: - """Cython wrapper class of C++ `ray::raylet::TaskExecutionSpecification`.""" + """Cython wrapper class of C++ `ray::TaskExecutionSpecification`.""" cdef: unique_ptr[CTaskExecutionSpec] c_spec @@ -259,7 +279,7 @@ cdef class TaskExecutionSpec: cdef class Task: - """Cython wrapper class of C++ `ray::raylet::Task`.""" + """Cython wrapper class of C++ `ray::Task`.""" cdef: unique_ptr[CTask] c_task diff --git a/python/ray/worker.py b/python/ray/worker.py index d35e2d326..29de380e4 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -868,7 +868,7 @@ class Worker(object): assert self.current_task_id.is_nil() assert self.task_context.task_index == 0 assert self.task_context.put_index == 1 - if task.actor_id().is_nil(): + if not task.is_actor_task(): # If this worker is not an actor, check that `current_job_id` # was reset when the worker finished the previous task. assert self.current_job_id.is_nil() @@ -887,8 +887,7 @@ class Worker(object): task.function_descriptor_list()) args = task.arguments() return_object_ids = task.returns() - if (not task.actor_id().is_nil() - or not task.actor_creation_id().is_nil()): + if task.is_actor_task() or task.is_actor_creation_task(): dummy_return_id = return_object_ids.pop() function_executor = function_execution_info.function function_name = function_execution_info.function_name @@ -911,11 +910,10 @@ class Worker(object): try: self._current_task = task with profiling.profile("task:execute"): - if (task.actor_id().is_nil() - and task.actor_creation_id().is_nil()): + if task.is_normal_task(): outputs = function_executor(*arguments) else: - if not task.actor_id().is_nil(): + if task.is_actor_task(): key = task.actor_id() else: key = task.actor_creation_id() @@ -924,7 +922,7 @@ class Worker(object): except Exception as e: # Determine whether the exception occured during a task, not an # actor method. - task_exception = task.actor_id().is_nil() + task_exception = not task.is_actor_task() traceback_str = ray.utils.format_error_message( traceback.format_exc(), task_exception=task_exception) self._handle_process_task_failure( @@ -980,8 +978,7 @@ class Worker(object): # TODO(rkn): It would be preferable for actor creation tasks to share # more of the code path with regular task execution. - if not task.actor_creation_id().is_nil(): - assert self.actor_id.is_nil() + if task.is_actor_creation_task(): self.actor_id = task.actor_creation_id() self.actor_creation_task_id = task.task_id() actor_class = self.function_actor_manager.load_actor_class( @@ -999,8 +996,8 @@ class Worker(object): # Execute the task. function_name = execution_info.function_name extra_data = {"name": function_name, "task_id": task.task_id().hex()} - if task.actor_id().is_nil(): - if task.actor_creation_id().is_nil(): + if not task.is_actor_task(): + if not task.is_actor_creation_task(): title = "ray_worker:{}()".format(function_name) next_title = "ray_worker" else: diff --git a/src/ray/rpc/util.h b/src/ray/common/grpc_util.h similarity index 64% rename from src/ray/rpc/util.h rename to src/ray/common/grpc_util.h index 97bfd0e9d..eb3e98c69 100644 --- a/src/ray/rpc/util.h +++ b/src/ray/common/grpc_util.h @@ -1,14 +1,44 @@ -#ifndef RAY_RPC_UTIL_H -#define RAY_RPC_UTIL_H +#ifndef RAY_COMMON_GRPC_UTIL_H +#define RAY_COMMON_GRPC_UTIL_H #include #include #include -#include "ray/common/status.h" +#include "status.h" namespace ray { -namespace rpc { + +/// Wrap a protobuf message. +template +class MessageWrapper { + public: + /// Construct an empty message wrapper. This should not be used directly. + MessageWrapper() {} + + /// Construct from a protobuf message object. + /// The input message will be **copied** into this object. + /// + /// \param message The protobuf message. + explicit MessageWrapper(const Message message) : message_(std::move(message)) {} + + /// Construct from protobuf-serialized binary. + /// + /// \param serialized_binary Protobuf-serialized binary. + explicit MessageWrapper(const std::string &serialized_binary) { + message_.ParseFromString(serialized_binary); + } + + /// Get reference of the protobuf message. + const Message &GetMessage() const { return message_; } + + /// Serialize the message to a string. + const std::string Serialize() const { return message_.SerializeAsString(); } + + protected: + /// The wrapped message. + Message message_; +}; /// Helper function that converts a ray status to gRPC status. inline grpc::Status RayStatusToGrpcStatus(const Status &ray_status) { @@ -60,7 +90,6 @@ inline std::unordered_map MapFromProtobuf(::google::protobuf::Map pb return std::unordered_map(pb_map.begin(), pb_map.end()); } -} // namespace rpc } // namespace ray #endif diff --git a/src/ray/raylet/scheduling_resources.cc b/src/ray/common/task/scheduling_resources.cc similarity index 99% rename from src/ray/raylet/scheduling_resources.cc rename to src/ray/common/task/scheduling_resources.cc index 6adbbb37c..bbcdd6fec 100644 --- a/src/ray/raylet/scheduling_resources.cc +++ b/src/ray/common/task/scheduling_resources.cc @@ -7,8 +7,6 @@ namespace ray { -namespace raylet { - FractionalResourceQuantity::FractionalResourceQuantity() { resource_quantity_ = 0; } FractionalResourceQuantity::FractionalResourceQuantity(double resource_quantity) { @@ -785,6 +783,4 @@ std::string SchedulingResources::DebugString() const { return result.str(); }; -} // namespace raylet - } // namespace ray diff --git a/src/ray/raylet/scheduling_resources.h b/src/ray/common/task/scheduling_resources.h similarity index 98% rename from src/ray/raylet/scheduling_resources.h rename to src/ray/common/task/scheduling_resources.h index c06b8a194..355ad7573 100644 --- a/src/ray/raylet/scheduling_resources.h +++ b/src/ray/common/task/scheduling_resources.h @@ -1,5 +1,5 @@ -#ifndef RAY_RAYLET_SCHEDULING_RESOURCES_H -#define RAY_RAYLET_SCHEDULING_RESOURCES_H +#ifndef RAY_COMMON_TASK_SCHEDULING_RESOURCES_H +#define RAY_COMMON_TASK_SCHEDULING_RESOURCES_H #include #include @@ -10,8 +10,6 @@ namespace ray { -namespace raylet { - /// Conversion factor that is the amount in internal units is equivalent to /// one actual resource. Multiply to convert from actual to interal and /// divide to convert from internal to actual. @@ -519,14 +517,12 @@ class SchedulingResources { ResourceSet resources_load_; }; -} // namespace raylet - } // namespace ray namespace std { template <> -struct hash { - size_t operator()(ray::raylet::ResourceSet const &k) const { +struct hash { + size_t operator()(ray::ResourceSet const &k) const { size_t seed = k.GetResourceMap().size(); for (auto &elem : k.GetResourceMap()) { seed ^= std::hash()(elem.first); @@ -537,4 +533,4 @@ struct hash { }; } // namespace std -#endif // RAY_RAYLET_SCHEDULING_RESOURCES_H +#endif // RAY_COMMON_TASK_SCHEDULING_RESOURCES_H diff --git a/src/ray/raylet/task.cc b/src/ray/common/task/task.cc similarity index 81% rename from src/ray/raylet/task.cc rename to src/ray/common/task/task.cc index 59f45d97e..3a603d72d 100644 --- a/src/ray/raylet/task.cc +++ b/src/ray/common/task/task.cc @@ -1,9 +1,9 @@ +#include + #include "task.h" namespace ray { -namespace raylet { - const TaskExecutionSpecification &Task::GetTaskExecutionSpec() const { return task_execution_spec_; } @@ -34,6 +34,11 @@ void Task::CopyTaskExecutionSpec(const Task &task) { ComputeDependencies(); } -} // namespace raylet +std::string Task::DebugString() const { + std::ostringstream stream; + stream << "task_spec={" << task_spec_.DebugString() << "}, task_execution_spec={" + << task_execution_spec_.DebugString() << "}"; + return stream.str(); +} } // namespace ray diff --git a/src/ray/raylet/task.h b/src/ray/common/task/task.h similarity index 88% rename from src/ray/raylet/task.h rename to src/ray/common/task/task.h index 0f80060fb..80e3f71eb 100644 --- a/src/ray/raylet/task.h +++ b/src/ray/common/task/task.h @@ -1,18 +1,14 @@ -#ifndef RAY_RAYLET_TASK_H -#define RAY_RAYLET_TASK_H +#ifndef RAY_COMMON_TASK_TASK_H +#define RAY_COMMON_TASK_TASK_H #include -#include "ray/protobuf/common.pb.h" -#include "ray/raylet/format/node_manager_generated.h" -#include "ray/raylet/task_execution_spec.h" -#include "ray/raylet/task_spec.h" -#include "ray/rpc/message_wrapper.h" +#include "ray/common/task/task_common.h" +#include "ray/common/task/task_execution_spec.h" +#include "ray/common/task/task_spec.h" namespace ray { -namespace raylet { - /// \class Task /// /// A Task represents a Ray task and a specification of its execution (e.g., @@ -66,6 +62,8 @@ class Task { /// \param task Task structure with updated dynamic information. void CopyTaskExecutionSpec(const Task &task); + std::string DebugString() const; + private: void ComputeDependencies(); @@ -82,8 +80,6 @@ class Task { std::vector dependencies_; }; -} // namespace raylet - } // namespace ray -#endif // RAY_RAYLET_TASK_H +#endif // RAY_COMMON_TASK_TASK_H diff --git a/src/ray/common/task/task_common.h b/src/ray/common/task/task_common.h new file mode 100644 index 000000000..e67820758 --- /dev/null +++ b/src/ray/common/task/task_common.h @@ -0,0 +1,20 @@ +#ifndef RAY_COMMON_TASK_TASK_COMMON_H +#define RAY_COMMON_TASK_TASK_COMMON_H + +#include "ray/protobuf/common.pb.h" + +namespace ray { + +// NOTE(hchen): Below we alias `ray::rpc::Language|TaskType)` in `ray` namespace. +// The reason is because other code should use them as if they were defined in this +// `task_common.h` file, shouldn't care about the implementation detail that they +// are defined in protobuf. + +/// See `common.proto` for definition of `Language` enum. +using Language = rpc::Language; +/// See `common.proto` for definition of `TaskType` enum. +using TaskType = rpc::TaskType; + +} // namespace ray + +#endif diff --git a/src/ray/raylet/task_execution_spec.cc b/src/ray/common/task/task_execution_spec.cc similarity index 57% rename from src/ray/raylet/task_execution_spec.cc rename to src/ray/common/task/task_execution_spec.cc index ed7e60e2c..849414389 100644 --- a/src/ray/raylet/task_execution_spec.cc +++ b/src/ray/common/task/task_execution_spec.cc @@ -1,11 +1,9 @@ -#include "ray/raylet/task_execution_spec.h" +#include + +#include "ray/common/task/task_execution_spec.h" namespace ray { -namespace raylet { - -using rpc::IdVectorFromProtobuf; - const std::vector TaskExecutionSpecification::ExecutionDependencies() const { return IdVectorFromProtobuf(message_.dependencies()); } @@ -16,6 +14,11 @@ void TaskExecutionSpecification::IncrementNumForwards() { message_.set_num_forwards(message_.num_forwards() + 1); } -} // namespace raylet +std::string TaskExecutionSpecification::DebugString() const { + std::ostringstream stream; + stream << "num_dependencies=" << message_.dependencies_size() + << ", num_forwards=" << message_.num_forwards(); + return stream.str(); +} } // namespace ray diff --git a/src/ray/raylet/task_execution_spec.h b/src/ray/common/task/task_execution_spec.h similarity index 80% rename from src/ray/raylet/task_execution_spec.h rename to src/ray/common/task/task_execution_spec.h index e78d064f5..44813c275 100644 --- a/src/ray/raylet/task_execution_spec.h +++ b/src/ray/common/task/task_execution_spec.h @@ -1,19 +1,14 @@ -#ifndef RAY_RAYLET_TASK_EXECUTION_SPECIFICATION_H -#define RAY_RAYLET_TASK_EXECUTION_SPECIFICATION_H +#ifndef RAY_COMMON_TASK_TASK_EXECUTION_SPEC_H +#define RAY_COMMON_TASK_TASK_EXECUTION_SPEC_H #include +#include "ray/common/grpc_util.h" #include "ray/common/id.h" -#include "ray/protobuf/common.pb.h" -#include "ray/rpc/message_wrapper.h" -#include "ray/rpc/util.h" +#include "ray/common/task/task_common.h" namespace ray { -namespace raylet { - -using rpc::MessageWrapper; - /// Wrapper class of protobuf `TaskExecutionSpec`, see `common.proto` for details. class TaskExecutionSpecification : public MessageWrapper { public: @@ -47,10 +42,10 @@ class TaskExecutionSpecification : public MessageWrapper /// Increment the number of times this task has been forwarded. void IncrementNumForwards(); -}; -} // namespace raylet + std::string DebugString() const; +}; } // namespace ray -#endif // RAY_RAYLET_TASK_EXECUTION_SPECIFICATION_H +#endif // RAY_COMMON_TASK_TASK_EXECUTION_SPEC_H diff --git a/src/ray/raylet/task_spec.cc b/src/ray/common/task/task_spec.cc similarity index 68% rename from src/ray/raylet/task_spec.cc rename to src/ray/common/task/task_spec.cc index ccee1d122..1d5bcb7aa 100644 --- a/src/ray/raylet/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -1,17 +1,10 @@ - #include -#include "ray/raylet/task_spec.h" -#include "ray/rpc/util.h" +#include "ray/common/task/task_spec.h" #include "ray/util/logging.h" namespace ray { -namespace raylet { - -using rpc::MapFromProtobuf; -using rpc::VectorFromProtobuf; - void TaskSpecification::ComputeResources() { auto required_resources = MapFromProtobuf(message_.required_resources()); auto required_placement_resources = @@ -40,20 +33,6 @@ std::vector TaskSpecification::FunctionDescriptor() const { return VectorFromProtobuf(message_.function_descriptor()); } -std::string TaskSpecification::FunctionDescriptorString() const { - auto list = VectorFromProtobuf(message_.function_descriptor()); - std::ostringstream stream; - // The 4th is the code hash which is binary bits. No need to output it. - size_t size = std::min(static_cast(3), list.size()); - for (int i = 0; i < size; ++i) { - if (i != 0) { - stream << ","; - } - stream << list[i]; - } - return stream.str(); -} - size_t TaskSpecification::NumArgs() const { return message_.args_size(); } size_t TaskSpecification::NumReturns() const { return message_.num_returns(); } @@ -95,79 +74,103 @@ bool TaskSpecification::IsDriverTask() const { return FunctionDescriptor().empty(); } -rpc::Language TaskSpecification::GetLanguage() const { return message_.language(); } +Language TaskSpecification::GetLanguage() const { return message_.language(); } + +bool TaskSpecification::IsNormalTask() const { + return message_.type() == TaskType::NORMAL_TASK; +} bool TaskSpecification::IsActorCreationTask() const { - return message_.type() == rpc::TaskType::ACTOR_CREATION_TASK; + return message_.type() == TaskType::ACTOR_CREATION_TASK; } bool TaskSpecification::IsActorTask() const { - return message_.type() == rpc::TaskType::ACTOR_TASK; + return message_.type() == TaskType::ACTOR_TASK; } +// === Below are getter methods specific to actor creation tasks. + ActorID TaskSpecification::ActorCreationId() const { - // TODO(hchen) Add a check to make sure this function can only be called if - // task is an actor creation task. - if (!IsActorCreationTask()) { - return ActorID::Nil(); - } + RAY_CHECK(IsActorCreationTask()); return ActorID::FromBinary(message_.actor_creation_task_spec().actor_id()); } -ObjectID TaskSpecification::ActorCreationDummyObjectId() const { - if (!IsActorTask()) { - return ObjectID::Nil(); - } - return ObjectID::FromBinary( - message_.actor_task_spec().actor_creation_dummy_object_id()); -} - uint64_t TaskSpecification::MaxActorReconstructions() const { - if (!IsActorCreationTask()) { - return 0; - } + RAY_CHECK(IsActorCreationTask()); return message_.actor_creation_task_spec().max_actor_reconstructions(); } +std::vector TaskSpecification::DynamicWorkerOptions() const { + RAY_CHECK(IsActorCreationTask()); + return VectorFromProtobuf(message_.actor_creation_task_spec().dynamic_worker_options()); +} + +// === Below are getter methods specific to actor tasks. + ActorID TaskSpecification::ActorId() const { - if (!IsActorTask()) { - return ActorID::Nil(); - } + RAY_CHECK(IsActorTask()); return ActorID::FromBinary(message_.actor_task_spec().actor_id()); } ActorHandleID TaskSpecification::ActorHandleId() const { - if (!IsActorTask()) { - return ActorHandleID::Nil(); - } + RAY_CHECK(IsActorTask()); return ActorHandleID::FromBinary(message_.actor_task_spec().actor_handle_id()); } uint64_t TaskSpecification::ActorCounter() const { - if (!IsActorTask()) { - return 0; - } + RAY_CHECK(IsActorTask()); return message_.actor_task_spec().actor_counter(); } +ObjectID TaskSpecification::ActorCreationDummyObjectId() const { + RAY_CHECK(IsActorTask()); + return ObjectID::FromBinary( + message_.actor_task_spec().actor_creation_dummy_object_id()); +} + ObjectID TaskSpecification::ActorDummyObject() const { RAY_CHECK(IsActorTask() || IsActorCreationTask()); return ReturnId(NumReturns() - 1); } std::vector TaskSpecification::NewActorHandles() const { - if (!IsActorTask()) { - return {}; - } - return rpc::IdVectorFromProtobuf( + RAY_CHECK(IsActorTask()); + return IdVectorFromProtobuf( message_.actor_task_spec().new_actor_handles()); } -std::vector TaskSpecification::DynamicWorkerOptions() const { - return rpc::VectorFromProtobuf( - message_.actor_creation_task_spec().dynamic_worker_options()); +std::string TaskSpecification::DebugString() const { + std::ostringstream stream; + stream << "Type=" << TaskType_Name(message_.type()) + << ", Language=" << Language_Name(message_.language()) + << ", function_descriptor="; + + // Print function descriptor. + const auto list = VectorFromProtobuf(message_.function_descriptor()); + // The 4th is the code hash which is binary bits. No need to output it. + const size_t size = std::min(static_cast(3), list.size()); + for (int i = 0; i < size; ++i) { + if (i != 0) { + stream << ","; + } + stream << list[i]; + } + + stream << ", task_id=" << TaskId() << ", job_id=" << JobId() + << ", num_args=" << NumArgs() << ", num_returns=" << NumReturns(); + + if (IsActorCreationTask()) { + // Print actor creation task spec. + stream << ", actor_creation_task_spec={actor_id=" << ActorCreationId() + << ", max_reconstructions=" << MaxActorReconstructions() << "}"; + } else if (IsActorTask()) { + // Print actor task spec. + stream << ", actor_task_spec={actor_id=" << ActorId() + << ", actor_handle_id=" << ActorHandleId() + << ", actor_counter=" << ActorCounter() << "}"; + } + + return stream.str(); } -} // namespace raylet - } // namespace ray diff --git a/src/ray/raylet/task_spec.h b/src/ray/common/task/task_spec.h similarity index 87% rename from src/ray/raylet/task_spec.h rename to src/ray/common/task/task_spec.h index ea72efc15..376433df4 100644 --- a/src/ray/raylet/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -1,15 +1,15 @@ -#ifndef RAY_RAYLET_TASK_SPECIFICATION_H -#define RAY_RAYLET_TASK_SPECIFICATION_H +#ifndef RAY_COMMON_TASK_TASK_SPEC_H +#define RAY_COMMON_TASK_TASK_SPEC_H #include #include #include #include +#include "ray/common/grpc_util.h" #include "ray/common/id.h" -#include "ray/protobuf/common.pb.h" -#include "ray/raylet/scheduling_resources.h" -#include "ray/rpc/message_wrapper.h" +#include "ray/common/task/scheduling_resources.h" +#include "ray/common/task/task_common.h" extern "C" { #include "ray/thirdparty/sha256.h" @@ -17,12 +17,6 @@ extern "C" { namespace ray { -namespace raylet { - -using rpc::Language; -using rpc::MessageWrapper; -using rpc::TaskType; - /// Wrapper class of protobuf `TaskSpec`, see `common.proto` for details. class TaskSpecification : public MessageWrapper { public: @@ -56,9 +50,6 @@ class TaskSpecification : public MessageWrapper { std::vector FunctionDescriptor() const; - // Output the function descriptor as a string for log purpose. - std::string FunctionDescriptorString() const; - size_t NumArgs() const; size_t NumReturns() const; @@ -97,28 +88,38 @@ class TaskSpecification : public MessageWrapper { Language GetLanguage() const; - // Methods specific to actor tasks. + /// Whether this task is a normal task. + bool IsNormalTask() const; + + /// Whether this task is an actor creation task. bool IsActorCreationTask() const; + /// Whether this task is an actor task. bool IsActorTask() const; + // Methods specific to actor creation tasks. + ActorID ActorCreationId() const; - ObjectID ActorCreationDummyObjectId() const; - uint64_t MaxActorReconstructions() const; + std::vector DynamicWorkerOptions() const; + + // Methods specific to actor tasks. + ActorID ActorId() const; ActorHandleID ActorHandleId() const; uint64_t ActorCounter() const; - ObjectID ActorDummyObject() const; + ObjectID ActorCreationDummyObjectId() const; std::vector NewActorHandles() const; - std::vector DynamicWorkerOptions() const; + ObjectID ActorDummyObject() const; + + std::string DebugString() const; private: void ComputeResources(); @@ -128,8 +129,6 @@ class TaskSpecification : public MessageWrapper { ResourceSet required_placement_resources_; }; -} // namespace raylet - } // namespace ray -#endif // RAY_RAYLET_TASK_SPECIFICATION_H +#endif // RAY_COMMON_TASK_TASK_SPEC_H diff --git a/src/ray/raylet/task_test.cc b/src/ray/common/task/task_test.cc similarity index 95% rename from src/ray/raylet/task_test.cc rename to src/ray/common/task/task_test.cc index 524c9aaca..a1d4c6d74 100644 --- a/src/ray/raylet/task_test.cc +++ b/src/ray/common/task/task_test.cc @@ -1,12 +1,10 @@ #include "gtest/gtest.h" #include "ray/common/common_protocol.h" -#include "ray/raylet/task_spec.h" +#include "ray/common/task/task_spec.h" namespace ray { -namespace raylet { - void TestTaskReturnId(const TaskID &task_id, int64_t return_index) { // Round trip test for computing the object ID for a task's return value, // then computing the task ID that created the object. @@ -48,8 +46,6 @@ TEST(IdPropertyTest, TestIdProperty) { ASSERT_TRUE(ObjectID::Nil().IsNil()); } -} // namespace raylet - } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/raylet/task_util.h b/src/ray/common/task/task_util.h similarity index 95% rename from src/ray/raylet/task_util.h rename to src/ray/common/task/task_util.h index 63c0ed890..21c084ae5 100644 --- a/src/ray/raylet/task_util.h +++ b/src/ray/common/task/task_util.h @@ -1,13 +1,11 @@ -#ifndef RAY_RAYLET_TASK_UTIL_H -#define RAY_RAYLET_TASK_UTIL_H +#ifndef RAY_COMMON_TASK_TASK_UTIL_H +#define RAY_COMMON_TASK_TASK_UTIL_H +#include "ray/common/task/task_spec.h" #include "ray/protobuf/common.pb.h" -#include "ray/raylet/task_spec.h" namespace ray { -namespace raylet { - /// Helper class for building a `TaskSpecification` object. class TaskSpecBuilder { public: @@ -27,7 +25,7 @@ class TaskSpecBuilder { uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources) { - message_.set_type(rpc::TaskType::NORMAL_TASK); + message_.set_type(TaskType::NORMAL_TASK); message_.set_language(language); for (const auto &fd : function_descriptor) { message_.add_function_descriptor(fd); @@ -114,7 +112,6 @@ class TaskSpecBuilder { rpc::TaskSpec message_; }; -} // namespace raylet } // namespace ray -#endif // RAY_RAYLET_TASK_UTIL_H +#endif // RAY_COMMON_TASK_TASK_UTIL_H diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index d15e7c8d8..2e32e6ecd 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -5,14 +5,11 @@ #include "ray/common/buffer.h" #include "ray/common/id.h" +#include "ray/common/task/task_spec.h" #include "ray/raylet/raylet_client.h" -#include "ray/raylet/task_spec.h" namespace ray { -using rpc::Language; -using rpc::TaskType; - /// Type of this worker. enum class WorkerType { WORKER, DRIVER }; @@ -83,21 +80,20 @@ struct TaskInfo { /// TODO(zhijunfu): this can be removed after everything is moved to protobuf. class TaskSpec { public: - TaskSpec(const raylet::TaskSpecification &task_spec, - const std::vector &dependencies) + TaskSpec(const TaskSpecification &task_spec, const std::vector &dependencies) : task_spec_(task_spec), dependencies_(dependencies) {} - TaskSpec(const raylet::TaskSpecification &&task_spec, + TaskSpec(const TaskSpecification &&task_spec, const std::vector &&dependencies) : task_spec_(task_spec), dependencies_(dependencies) {} - const raylet::TaskSpecification &GetTaskSpecification() const { return task_spec_; } + const TaskSpecification &GetTaskSpecification() const { return task_spec_; } const std::vector &GetDependencies() const { return dependencies_; } private: /// Raylet task specification. - raylet::TaskSpecification task_spec_; + TaskSpecification task_spec_; /// Dependencies. std::vector dependencies_; diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index b8aac0da4..9083fe2b9 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -20,9 +20,7 @@ struct WorkerThreadContext { put_index = 0; } - void SetCurrentTask(const raylet::TaskSpecification &spec) { - SetCurrentTask(spec.TaskId()); - } + void SetCurrentTask(const TaskSpecification &spec) { SetCurrentTask(spec.TaskId()); } private: /// The task ID for current task. @@ -64,7 +62,7 @@ const TaskID &WorkerContext::GetCurrentTaskID() const { return GetThreadContext().GetCurrentTaskID(); } -void WorkerContext::SetCurrentTask(const raylet::TaskSpecification &spec) { +void WorkerContext::SetCurrentTask(const TaskSpecification &spec) { current_job_id = spec.JobId(); GetThreadContext().SetCurrentTask(spec); } diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index 5bfb830af..a8330055f 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -1,8 +1,8 @@ #ifndef RAY_CORE_WORKER_CONTEXT_H #define RAY_CORE_WORKER_CONTEXT_H +#include "ray/common/task/task_spec.h" #include "ray/core_worker/common.h" -#include "ray/raylet/task_spec.h" namespace ray { @@ -20,7 +20,7 @@ class WorkerContext { const TaskID &GetCurrentTaskID() const; - void SetCurrentTask(const raylet::TaskSpecification &spec); + void SetCurrentTask(const TaskSpecification &spec); int GetNextTaskIndex(); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 661996630..5b14795fd 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -3,7 +3,7 @@ namespace ray { -CoreWorker::CoreWorker(const enum WorkerType worker_type, const enum Language language, +CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, const std::string &store_socket, const std::string &raylet_socket, const JobID &job_id) : worker_type_(worker_type), diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index c61b50c78..3798b8c4f 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -27,10 +27,10 @@ class CoreWorker { const JobID &job_id = JobID::Nil()); /// Type of this worker. - enum WorkerType WorkerType() const { return worker_type_; } + WorkerType GetWorkerType() const { return worker_type_; } /// Language of this worker. - enum Language Language() const { return language_; } + Language GetLanguage() const { return language_; } /// Return the `CoreWorkerTaskInterface` that contains the methods related to task /// submisson. @@ -49,10 +49,10 @@ class CoreWorker { private: /// Type of this worker. - const enum WorkerType worker_type_; + const WorkerType worker_type_; /// Language of this worker. - const enum Language language_; + const Language language_; /// raylet socket name. const std::string raylet_socket_; diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index bbcfb39ef..19ed37f69 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -71,8 +71,7 @@ Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { } Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor( - const raylet::TaskSpecification &spec, - std::vector> *args) { + const TaskSpecification &spec, std::vector> *args) { auto num_args = spec.NumArgs(); (*args).resize(num_args); diff --git a/src/ray/core_worker/task_execution.h b/src/ray/core_worker/task_execution.h index 27f43e9fb..b154b8f01 100644 --- a/src/ray/core_worker/task_execution.h +++ b/src/ray/core_worker/task_execution.h @@ -50,7 +50,7 @@ class CoreWorkerTaskExecutionInterface { /// \param spec[in] Task specification. /// \param args[out] The arguments for passing to task executor. /// - Status BuildArgsForExecutor(const raylet::TaskSpecification &spec, + Status BuildArgsForExecutor(const TaskSpecification &spec, std::vector> *args); /// Reference to the parent CoreWorker's context. diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index da6bf25fa..6086e341f 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -33,7 +33,7 @@ ray::ActorHandleID ActorHandle::ActorHandleID() const { Language ActorHandle::ActorLanguage() const { return inner_.actor_language(); }; std::vector ActorHandle::ActorCreationTaskFunctionDescriptor() const { - return ray::rpc::VectorFromProtobuf(inner_.actor_creation_task_function_descriptor()); + return VectorFromProtobuf(inner_.actor_creation_task_function_descriptor()); }; ObjectID ActorHandle::ActorCursor() const { @@ -98,12 +98,12 @@ CoreWorkerTaskInterface::CoreWorkerTaskInterface( new CoreWorkerRayletTaskSubmitter(raylet_client))); } -raylet::TaskSpecBuilder CoreWorkerTaskInterface::BuildCommonTaskSpec( +TaskSpecBuilder CoreWorkerTaskInterface::BuildCommonTaskSpec( const RayFunction &function, const std::vector &args, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, std::vector *return_ids) { - raylet::TaskSpecBuilder builder; + TaskSpecBuilder builder; auto next_task_index = worker_context_.GetNextTaskIndex(); // Build common task spec. builder.SetCommonTaskSpec( diff --git a/src/ray/core_worker/task_interface.h b/src/ray/core_worker/task_interface.h index 85c6d8ddb..2fea4fd68 100644 --- a/src/ray/core_worker/task_interface.h +++ b/src/ray/core_worker/task_interface.h @@ -2,21 +2,19 @@ #define RAY_CORE_WORKER_TASK_INTERFACE_H #include "ray/common/buffer.h" +#include "ray/common/grpc_util.h" #include "ray/common/id.h" #include "ray/common/status.h" +#include "ray/common/task/task.h" +#include "ray/common/task/task_spec.h" +#include "ray/common/task/task_util.h" #include "ray/core_worker/common.h" #include "ray/core_worker/context.h" #include "ray/core_worker/transport/transport.h" #include "ray/protobuf/core_worker.pb.h" -#include "ray/raylet/task.h" -#include "ray/raylet/task_spec.h" -#include "ray/raylet/task_util.h" -#include "ray/rpc/util.h" namespace ray { -using rpc::Language; - class CoreWorker; /// Options of a non-actor-creation task. @@ -163,7 +161,7 @@ class CoreWorkerTaskInterface { /// node. /// \param[out] return_ids Return IDs. /// \return A `TaskSpecBuilder`. - raylet::TaskSpecBuilder BuildCommonTaskSpec( + TaskSpecBuilder BuildCommonTaskSpec( const RayFunction &function, const std::vector &args, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc index 11f66dc04..ece982701 100644 --- a/src/ray/core_worker/transport/raylet_transport.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -1,6 +1,6 @@ #include "ray/core_worker/transport/raylet_transport.h" -#include "ray/raylet/task.h" +#include "ray/common/task/task.h" namespace ray { @@ -13,7 +13,7 @@ Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpec &task) { } Status CoreWorkerRayletTaskReceiver::GetTasks(std::vector *tasks) { - std::unique_ptr task_spec; + std::unique_ptr task_spec; auto status = raylet_client_->GetTask(&task_spec); if (!status.ok()) { RAY_LOG(ERROR) << "Get task from raylet failed with error: " @@ -38,7 +38,7 @@ CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver( void CoreWorkerRayletTaskReceiver::HandleAssignTask( const rpc::AssignTaskRequest &request, rpc::AssignTaskReply *reply, rpc::SendReplyCallback send_reply_callback) { - const raylet::Task task(request.task()); + const Task task(request.task()); const auto &spec = task.GetTaskSpecification(); auto status = task_handler_(spec); send_reply_callback(status, nullptr, nullptr); diff --git a/src/ray/core_worker/transport/transport.h b/src/ray/core_worker/transport/transport.h index 8433b3b17..fc67a2d41 100644 --- a/src/ray/core_worker/transport/transport.h +++ b/src/ray/core_worker/transport/transport.h @@ -6,8 +6,8 @@ #include "ray/common/buffer.h" #include "ray/common/id.h" #include "ray/common/status.h" +#include "ray/common/task/task_spec.h" #include "ray/core_worker/common.h" -#include "ray/raylet/task_spec.h" namespace ray { @@ -32,7 +32,7 @@ class CoreWorkerTaskSubmitter { /// This class receives tasks for execution. class CoreWorkerTaskReceiver { public: - using TaskHandler = std::function; + using TaskHandler = std::function; // Get tasks for execution. virtual Status GetTasks(std::vector *tasks) = 0; diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index cc555420b..7a07cd879 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -1,9 +1,9 @@ #include "ray/gcs/tables.h" #include "ray/common/common_protocol.h" +#include "ray/common/grpc_util.h" #include "ray/common/ray_config.h" #include "ray/gcs/client.h" -#include "ray/rpc/util.h" #include "ray/util/util.h" namespace { diff --git a/src/ray/raylet/actor_registration.h b/src/ray/raylet/actor_registration.h index 7a7ce8e17..67bd394e8 100644 --- a/src/ray/raylet/actor_registration.h +++ b/src/ray/raylet/actor_registration.h @@ -4,8 +4,8 @@ #include #include "ray/common/id.h" +#include "ray/common/task/task.h" #include "ray/protobuf/gcs.pb.h" -#include "ray/raylet/task.h" namespace ray { diff --git a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc index a37254222..5080f2edf 100644 --- a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc +++ b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc @@ -74,7 +74,7 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSubmit task_spec_message.ParseFromArray(data, size); env->ReleaseByteArrayElements(taskSpec, data, JNI_ABORT); - ray::raylet::TaskSpecification task_spec(task_spec_message); + ray::TaskSpecification task_spec(task_spec_message); auto status = raylet_client->SubmitTask(execution_dependencies, task_spec); ThrowRayExceptionIfNotOK(env, status); } @@ -88,7 +88,7 @@ JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_native JNIEnv *env, jclass, jlong client) { auto raylet_client = reinterpret_cast(client); - std::unique_ptr spec; + std::unique_ptr spec; auto status = raylet_client->GetTask(&spec); if (ThrowRayExceptionIfNotOK(env, status)) { return nullptr; diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index 6ff3fb5fc..c30bd2da4 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -6,8 +6,8 @@ #include "ray/common/id.h" #include "ray/common/status.h" +#include "ray/common/task/task.h" #include "ray/gcs/tables.h" -#include "ray/raylet/task.h" namespace ray { diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index ad3f1d7a2..19ac1918c 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -3,12 +3,12 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "ray/common/task/task.h" +#include "ray/common/task/task_execution_spec.h" +#include "ray/common/task/task_spec.h" +#include "ray/common/task/task_util.h" #include "ray/raylet/format/node_manager_generated.h" #include "ray/raylet/lineage_cache.h" -#include "ray/raylet/task.h" -#include "ray/raylet/task_execution_spec.h" -#include "ray/raylet/task_spec.h" -#include "ray/raylet/task_util.h" namespace ray { diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 664b2a513..f8c82c739 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -2,14 +2,12 @@ #include "ray/common/ray_config.h" #include "ray/common/status.h" -#include "ray/protobuf/common.pb.h" +#include "ray/common/task/task_common.h" #include "ray/raylet/raylet.h" #include "ray/stats/stats.h" #include "gflags/gflags.h" -using ray::rpc::Language; - DEFINE_string(raylet_socket_name, "", "The socket name of raylet."); DEFINE_string(store_socket_name, "", "The socket name of object store."); DEFINE_int32(object_manager_port, -1, "The port of object manager."); @@ -107,8 +105,7 @@ int main(int argc, char *argv[]) { static_resource_conf[resource_name] = std::stod(resource_quantity); } - node_manager_config.resource_config = - ray::raylet::ResourceSet(std::move(static_resource_conf)); + node_manager_config.resource_config = ray::ResourceSet(std::move(static_resource_conf)); RAY_LOG(DEBUG) << "Starting raylet with static resource configuration: " << node_manager_config.resource_config.ToString(); node_manager_config.node_manager_address = node_ip_address; @@ -120,11 +117,11 @@ int main(int argc, char *argv[]) { if (!python_worker_command.empty()) { node_manager_config.worker_commands.emplace( - make_pair(Language::PYTHON, parse_worker_command(python_worker_command))); + make_pair(ray::Language::PYTHON, parse_worker_command(python_worker_command))); } if (!java_worker_command.empty()) { node_manager_config.worker_commands.emplace( - make_pair(Language::JAVA, parse_worker_command(java_worker_command))); + make_pair(ray::Language::JAVA, parse_worker_command(java_worker_command))); } if (python_worker_command.empty() && java_worker_command.empty()) { RAY_CHECK(0) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 280d718ad..d6d15437b 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -547,11 +547,10 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id, SchedulingResources &remote_resources = it->second; ResourceSet remote_available( - rpc::VectorFromProtobuf(heartbeat_data.resources_total_label()), - rpc::VectorFromProtobuf(heartbeat_data.resources_total_capacity())); - ResourceSet remote_load( - rpc::VectorFromProtobuf(heartbeat_data.resource_load_label()), - rpc::VectorFromProtobuf(heartbeat_data.resource_load_capacity())); + VectorFromProtobuf(heartbeat_data.resources_total_label()), + VectorFromProtobuf(heartbeat_data.resources_total_capacity())); + ResourceSet remote_load(VectorFromProtobuf(heartbeat_data.resource_load_label()), + VectorFromProtobuf(heartbeat_data.resource_load_capacity())); // TODO(atumanov): assert that the load is a non-empty ResourceSet. remote_resources.SetAvailableResources(std::move(remote_available)); // Extract the load information and save it locally. @@ -1475,13 +1474,7 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag stats::TaskCountReceived().Record(1); const TaskSpecification &spec = task.GetTaskSpecification(); const TaskID &task_id = spec.TaskId(); - RAY_LOG(DEBUG) << "Submitting task: task_id=" << task_id - << ", actor_id=" << spec.ActorId() - << ", actor_creation_id=" << spec.ActorCreationId() - << ", actor_handle_id=" << spec.ActorHandleId() - << ", actor_counter=" << spec.ActorCounter() - << ", task_descriptor=" << spec.FunctionDescriptorString() << " on node " - << gcs_client_->client_table().GetLocalClientId(); + RAY_LOG(DEBUG) << "Submitting task: " << task.DebugString(); if (local_queues_.HasTask(task_id)) { RAY_LOG(WARNING) << "Submitted task " << task_id @@ -1824,8 +1817,6 @@ bool NodeManager::AssignTask(const Task &task) { // guarantee that tasks are replayed in the same order after a // failure, we must update the task's execution dependency to be // the actor's current execution dependency. - } else { - RAY_CHECK(spec.NewActorHandles().empty()); } // Mark the task as running. diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 7fb10d642..a3a0243b6 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -7,15 +7,15 @@ #include "ray/rpc/client_call.h" #include "ray/rpc/node_manager/node_manager_server.h" #include "ray/rpc/node_manager/node_manager_client.h" -#include "ray/raylet/task.h" -#include "ray/object_manager/object_manager.h" +#include "ray/common/task/task.h" #include "ray/common/client_connection.h" -#include "ray/protobuf/common.pb.h" +#include "ray/common/task/task_common.h" +#include "ray/common/task/scheduling_resources.h" +#include "ray/object_manager/object_manager.h" #include "ray/raylet/actor_registration.h" #include "ray/raylet/lineage_cache.h" #include "ray/raylet/scheduling_policy.h" #include "ray/raylet/scheduling_queue.h" -#include "ray/raylet/scheduling_resources.h" #include "ray/raylet/reconstruction_policy.h" #include "ray/raylet/task_dependency_manager.h" #include "ray/raylet/worker_pool.h" @@ -32,7 +32,6 @@ using rpc::ErrorType; using rpc::HeartbeatBatchTableData; using rpc::HeartbeatTableData; using rpc::JobTableData; -using rpc::Language; struct NodeManagerConfig { /// The node's resource configuration. diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h index 24826c6e6..01395141f 100644 --- a/src/ray/raylet/raylet.h +++ b/src/ray/raylet/raylet.h @@ -9,7 +9,7 @@ // clang-format off #include "ray/raylet/node_manager.h" #include "ray/object_manager/object_manager.h" -#include "ray/raylet/scheduling_resources.h" +#include "ray/common/task/scheduling_resources.h" // clang-format on namespace ray { diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index e6bb6b740..d16a88ed5 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -14,8 +14,8 @@ #include "ray/common/common_protocol.h" #include "ray/common/ray_config.h" +#include "ray/common/task/task_spec.h" #include "ray/raylet/format/node_manager_generated.h" -#include "ray/raylet/task_spec.h" #include "ray/util/logging.h" using MessageType = ray::protocol::MessageType; @@ -224,7 +224,7 @@ RayletClient::RayletClient(const std::string &raylet_socket, const ClientID &cli } ray::Status RayletClient::SubmitTask(const std::vector &execution_dependencies, - const ray::raylet::TaskSpecification &task_spec) { + const ray::TaskSpecification &task_spec) { flatbuffers::FlatBufferBuilder fbb; auto execution_dependencies_message = to_flatbuf(fbb, execution_dependencies); auto message = ray::protocol::CreateSubmitTaskRequest( @@ -233,8 +233,7 @@ ray::Status RayletClient::SubmitTask(const std::vector &execution_depe return conn_->WriteMessage(MessageType::SubmitTask, &fbb); } -ray::Status RayletClient::GetTask( - std::unique_ptr *task_spec) { +ray::Status RayletClient::GetTask(std::unique_ptr *task_spec) { std::unique_ptr reply; // Receive a task from the raylet. This will block until the raylet // gives this client a task. @@ -267,8 +266,8 @@ ray::Status RayletClient::GetTask( } // Return the copy of the task spec and pass ownership to the caller. - task_spec->reset(new ray::raylet::TaskSpecification( - string_from_flatbuf(*reply_message->task_spec()))); + task_spec->reset( + new ray::TaskSpecification(string_from_flatbuf(*reply_message->task_spec()))); return ray::Status::OK(); } diff --git a/src/ray/raylet/raylet_client.h b/src/ray/raylet/raylet_client.h index 3620c9d9c..fb96a6fd8 100644 --- a/src/ray/raylet/raylet_client.h +++ b/src/ray/raylet/raylet_client.h @@ -8,7 +8,7 @@ #include #include "ray/common/status.h" -#include "ray/raylet/task_spec.h" +#include "ray/common/task/task_spec.h" using ray::ActorCheckpointID; using ray::ActorID; @@ -18,7 +18,7 @@ using ray::ObjectID; using ray::TaskID; using ray::UniqueID; -using ray::rpc::Language; +using ray::Language; using ray::rpc::ProfileTableData; using MessageType = ray::protocol::MessageType; @@ -84,7 +84,7 @@ class RayletClient { /// \param The task specification. /// \return ray::Status. ray::Status SubmitTask(const std::vector &execution_dependencies, - const ray::raylet::TaskSpecification &task_spec); + const ray::TaskSpecification &task_spec); /// Get next task for this client. This will block until the scheduler assigns /// a task to this worker. The caller takes ownership of the returned task @@ -92,7 +92,7 @@ class RayletClient { /// /// \param task_spec The assigned task. /// \return ray::Status. - ray::Status GetTask(std::unique_ptr *task_spec); + ray::Status GetTask(std::unique_ptr *task_spec); /// Tell the raylet that the client has finished executing a task. /// diff --git a/src/ray/raylet/scheduling_policy.h b/src/ray/raylet/scheduling_policy.h index 7449f8ba0..5c95880a4 100644 --- a/src/ray/raylet/scheduling_policy.h +++ b/src/ray/raylet/scheduling_policy.h @@ -4,8 +4,8 @@ #include #include +#include "ray/common/task/scheduling_resources.h" #include "ray/raylet/scheduling_queue.h" -#include "ray/raylet/scheduling_resources.h" namespace ray { diff --git a/src/ray/raylet/scheduling_queue.cc b/src/ray/raylet/scheduling_queue.cc index 9270b4798..5a0f91dd7 100644 --- a/src/ray/raylet/scheduling_queue.cc +++ b/src/ray/raylet/scheduling_queue.cc @@ -39,7 +39,7 @@ inline void GetActorTasksFromQueue(const TaskQueue &queue, const ray::ActorID &a const auto &tasks = queue.GetTasks(); for (const auto &task : tasks) { auto const &spec = task.GetTaskSpecification(); - if (actor_id == spec.ActorId()) { + if (spec.IsActorTask() && actor_id == spec.ActorId()) { task_ids.insert(spec.TaskId()); } } @@ -211,9 +211,9 @@ const std::shared_ptr &SchedulingQueue::GetTaskQueue( // Helper function to remove tasks in the given set of task_ids from a // queue, and append them to the given vector removed_tasks. -void SchedulingQueue::RemoveTasksFromQueue( - ray::raylet::TaskState task_state, std::unordered_set &task_ids, - std::vector *removed_tasks) { +void SchedulingQueue::RemoveTasksFromQueue(ray::raylet::TaskState task_state, + std::unordered_set &task_ids, + std::vector *removed_tasks) { auto &queue = GetTaskQueue(task_state); for (auto it = task_ids.begin(); it != task_ids.end();) { const auto &task_id = *it; diff --git a/src/ray/raylet/scheduling_queue.h b/src/ray/raylet/scheduling_queue.h index 4a6b4f50b..2c503dc38 100644 --- a/src/ray/raylet/scheduling_queue.h +++ b/src/ray/raylet/scheduling_queue.h @@ -7,7 +7,7 @@ #include #include -#include "ray/raylet/task.h" +#include "ray/common/task/task.h" #include "ray/util/logging.h" #include "ray/util/ordered_set.h" @@ -321,7 +321,7 @@ class SchedulingQueue { /// TaskState::kNumTaskQueues). void RemoveTasksFromQueue(ray::raylet::TaskState task_state, std::unordered_set &task_ids, - std::vector *removed_tasks); + std::vector *removed_tasks); /// A helper function to filter out tasks of a given state from the set of /// task IDs. The requested task state must correspond to one of the task diff --git a/src/ray/raylet/task_dependency_manager.h b/src/ray/raylet/task_dependency_manager.h index a96558295..964c963f0 100644 --- a/src/ray/raylet/task_dependency_manager.h +++ b/src/ray/raylet/task_dependency_manager.h @@ -3,7 +3,7 @@ // clang-format off #include "ray/common/id.h" -#include "ray/raylet/task.h" +#include "ray/common/task/task.h" #include "ray/object_manager/object_manager.h" #include "ray/raylet/reconstruction_policy.h" #include "ray/util/util.h" diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index 1a469c596..86136e201 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -5,8 +5,8 @@ #include +#include "ray/common/task/task_util.h" #include "ray/raylet/task_dependency_manager.h" -#include "ray/raylet/task_util.h" namespace ray { diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 67deb04d0..81758f992 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -5,15 +5,13 @@ #include "ray/common/client_connection.h" #include "ray/common/id.h" -#include "ray/protobuf/common.pb.h" -#include "ray/raylet/scheduling_resources.h" +#include "ray/common/task/scheduling_resources.h" +#include "ray/common/task/task_common.h" namespace ray { namespace raylet { -using rpc::Language; - /// Worker class encapsulates the implementation details of a worker. A worker /// is the execution container around a unit of Ray work, such as a task or an /// actor. Ray units of work execute in the context of a Worker. diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 91552287a..d9cb37843 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -244,7 +244,6 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec) { auto &state = GetStateForLanguage(task_spec.GetLanguage()); - const auto &actor_id = task_spec.ActorId(); std::shared_ptr worker = nullptr; int pid = -1; @@ -281,6 +280,7 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec } } else { // Code path of actor task. + const auto &actor_id = task_spec.ActorId(); auto actor_entry = state.idle_actor.find(actor_id); if (actor_entry != state.idle_actor.end()) { worker = std::move(actor_entry->second); diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 32f6cb042..a243d53a7 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -7,17 +7,15 @@ #include #include "ray/common/client_connection.h" +#include "ray/common/task/task.h" +#include "ray/common/task/task_common.h" #include "ray/gcs/client.h" -#include "ray/protobuf/common.pb.h" -#include "ray/raylet/task.h" #include "ray/raylet/worker.h" namespace ray { namespace raylet { -using rpc::Language; - using WorkerCommandMap = std::unordered_map, std::hash>; diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index f80cf7376..4b4579459 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -110,16 +110,16 @@ static inline TaskSpecification ExampleTaskSpec( rpc::TaskSpec message; message.set_language(language); if (!actor_id.IsNil()) { - message.set_type(rpc::TaskType::ACTOR_TASK); + message.set_type(TaskType::ACTOR_TASK); message.mutable_actor_task_spec()->set_actor_id(actor_id.Binary()); } else if (!actor_creation_id.IsNil()) { - message.set_type(rpc::TaskType::ACTOR_CREATION_TASK); + message.set_type(TaskType::ACTOR_CREATION_TASK); message.mutable_actor_creation_task_spec()->set_actor_id(actor_creation_id.Binary()); for (const auto &option : dynamic_worker_options) { message.mutable_actor_creation_task_spec()->add_dynamic_worker_options(option); } } else { - message.set_type(rpc::TaskType::NORMAL_TASK); + message.set_type(TaskType::NORMAL_TASK); } return TaskSpecification(std::move(message)); } diff --git a/src/ray/rpc/client_call.h b/src/ray/rpc/client_call.h index b132c66a4..d8f18b0b8 100644 --- a/src/ray/rpc/client_call.h +++ b/src/ray/rpc/client_call.h @@ -4,8 +4,8 @@ #include #include +#include "ray/common/grpc_util.h" #include "ray/common/status.h" -#include "ray/rpc/util.h" namespace ray { namespace rpc { diff --git a/src/ray/rpc/message_wrapper.h b/src/ray/rpc/message_wrapper.h deleted file mode 100644 index 56fb5c53a..000000000 --- a/src/ray/rpc/message_wrapper.h +++ /dev/null @@ -1,45 +0,0 @@ -#ifndef RAY_RPC_WRAPPER_H -#define RAY_RPC_WRAPPER_H - -#include - -namespace ray { - -namespace rpc { - -/// Wrap a protobuf message. -template -class MessageWrapper { - public: - /// Construct an empty message wrapper. This should not be used directly. - MessageWrapper() {} - - /// Construct from a protobuf message object. - /// The input message will be **copied** into this object. - /// - /// \param message The protobuf message. - explicit MessageWrapper(const Message message) : message_(std::move(message)) {} - - /// Construct from protobuf-serialized binary. - /// - /// \param serialized_binary Protobuf-serialized binary. - explicit MessageWrapper(const std::string &serialized_binary) { - message_.ParseFromString(serialized_binary); - } - - /// Get reference of the protobuf message. - const Message &GetMessage() const { return message_; } - - /// Serialize the message to a string. - const std::string Serialize() const { return message_.SerializeAsString(); } - - protected: - /// The wrapped message. - Message message_; -}; - -} // namespace rpc - -} // namespace ray - -#endif // RAY_RPC_WRAPPER_H diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index b2afee0b1..89e7532cd 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -3,8 +3,8 @@ #include +#include "ray/common/grpc_util.h" #include "ray/common/status.h" -#include "ray/rpc/util.h" namespace ray { namespace rpc {