mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 14:53:47 +08:00
C++ worker API refactoring (#9053)
This commit is contained in:
+56
-47
@@ -13,9 +13,17 @@ namespace ray {
|
||||
namespace api {
|
||||
|
||||
template <typename T>
|
||||
class RayObject;
|
||||
class ObjectRef;
|
||||
template <typename T>
|
||||
class RayActor;
|
||||
class ActorHandle;
|
||||
template <typename ReturnType>
|
||||
class TaskCaller;
|
||||
|
||||
template <typename ReturnType>
|
||||
class ActorTaskCaller;
|
||||
|
||||
template <typename ActorType>
|
||||
class ActorCreator;
|
||||
|
||||
class WaitResult;
|
||||
|
||||
@@ -27,9 +35,9 @@ class Ray {
|
||||
/// Store an object in the object store.
|
||||
///
|
||||
/// \param[in] obj The object which should be stored.
|
||||
/// \return RayObject A reference to the object in the object store.
|
||||
/// \return ObjectRef A reference to the object in the object store.
|
||||
template <typename T>
|
||||
static RayObject<T> Put(const T &obj);
|
||||
static ObjectRef<T> Put(const T &obj);
|
||||
|
||||
/// Get a list of objects from the object store.
|
||||
/// This method will be blocked until all the objects are ready.
|
||||
@@ -45,9 +53,9 @@ class Ray {
|
||||
/// \param[in] objects The object array which should be got.
|
||||
/// \return shared pointer array of the result.
|
||||
template <typename T>
|
||||
static std::vector<std::shared_ptr<T>> Get(const std::vector<RayObject<T>> &ids);
|
||||
static std::vector<std::shared_ptr<T>> Get(const std::vector<ObjectRef<T>> &ids);
|
||||
|
||||
/// Wait for a list of RayObjects to be locally available,
|
||||
/// Wait for a list of objects to be locally available,
|
||||
/// until specified number of objects are ready, or specified timeout has passed.
|
||||
///
|
||||
/// \param[in] ids The object id array which should be waited.
|
||||
@@ -61,7 +69,7 @@ class Ray {
|
||||
/// Include the `Call` methods for calling remote functions.
|
||||
#include "api/generated/call_funcs.generated.h"
|
||||
|
||||
/// Include the `CreateActor` methods for creating actors.
|
||||
/// Include the `Actor` methods for creating actors.
|
||||
#include "api/generated/create_actors.generated.h"
|
||||
|
||||
private:
|
||||
@@ -69,72 +77,76 @@ class Ray {
|
||||
|
||||
static std::once_flag is_inited_;
|
||||
|
||||
/// Used by RayObject to implement .Get()
|
||||
/// Used by ObjectRef to implement .Get()
|
||||
template <typename T>
|
||||
static std::shared_ptr<T> Get(const RayObject<T> &object);
|
||||
static std::shared_ptr<T> Get(const ObjectRef<T> &object);
|
||||
|
||||
template <typename ReturnType, typename FuncType, typename ExecFuncType,
|
||||
typename... ArgTypes>
|
||||
static RayObject<ReturnType> CallInternal(FuncType &func, ExecFuncType &exec_func,
|
||||
ArgTypes &... args);
|
||||
static TaskCaller<ReturnType> TaskInternal(FuncType &func, ExecFuncType &exec_func,
|
||||
ArgTypes &... args);
|
||||
|
||||
template <typename ReturnType, typename FuncType, typename ExecFuncType,
|
||||
template <typename ActorType, typename FuncType, typename ExecFuncType,
|
||||
typename... ArgTypes>
|
||||
static RayActor<ReturnType> CreateActorInternal(FuncType &func, ExecFuncType &exec_func,
|
||||
ArgTypes &... args);
|
||||
static ActorCreator<ActorType> CreateActorInternal(FuncType &func,
|
||||
ExecFuncType &exec_func,
|
||||
ArgTypes &... args);
|
||||
|
||||
template <typename ReturnType, typename ActorType, typename FuncType,
|
||||
typename ExecFuncType, typename... ArgTypes>
|
||||
static RayObject<ReturnType> CallActorInternal(FuncType &actor_func,
|
||||
ExecFuncType &exec_func,
|
||||
RayActor<ActorType> &actor,
|
||||
ArgTypes &... args);
|
||||
static ActorTaskCaller<ReturnType> CallActorInternal(FuncType &actor_func,
|
||||
ExecFuncType &exec_func,
|
||||
ActorHandle<ActorType> &actor,
|
||||
ArgTypes &... args);
|
||||
|
||||
/// Include the `Call` methods for calling actor methods.
|
||||
/// Used by RayActor to implement .Call()
|
||||
/// Used by ActorHandle to implement .Call()
|
||||
#include "api/generated/call_actors.generated.h"
|
||||
|
||||
template <typename T>
|
||||
friend class RayObject;
|
||||
friend class ObjectRef;
|
||||
|
||||
template <typename ActorType>
|
||||
friend class RayActor;
|
||||
friend class ActorHandle;
|
||||
};
|
||||
|
||||
} // namespace api
|
||||
} // namespace ray
|
||||
|
||||
// --------- inline implementation ------------
|
||||
#include <ray/api/actor_creator.h>
|
||||
#include <ray/api/actor_handle.h>
|
||||
#include <ray/api/actor_task_caller.h>
|
||||
#include <ray/api/arguments.h>
|
||||
#include <ray/api/ray_actor.h>
|
||||
#include <ray/api/ray_object.h>
|
||||
#include <ray/api/object_ref.h>
|
||||
#include <ray/api/serializer.h>
|
||||
#include <ray/api/task_caller.h>
|
||||
#include <ray/api/wait_result.h>
|
||||
|
||||
namespace ray {
|
||||
namespace api {
|
||||
|
||||
template <typename T>
|
||||
inline static std::vector<ObjectID> RayObjectsToObjectIDs(
|
||||
const std::vector<RayObject<T>> &ray_objects) {
|
||||
inline static std::vector<ObjectID> ObjectRefsToObjectIDs(
|
||||
const std::vector<ObjectRef<T>> &object_refs) {
|
||||
std::vector<ObjectID> object_ids;
|
||||
for (auto it = ray_objects.begin(); it != ray_objects.end(); it++) {
|
||||
for (auto it = object_refs.begin(); it != object_refs.end(); it++) {
|
||||
object_ids.push_back(it->ID());
|
||||
}
|
||||
return object_ids;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
inline RayObject<T> Ray::Put(const T &obj) {
|
||||
inline ObjectRef<T> Ray::Put(const T &obj) {
|
||||
std::shared_ptr<msgpack::sbuffer> buffer(new msgpack::sbuffer());
|
||||
msgpack::packer<msgpack::sbuffer> packer(buffer.get());
|
||||
Serializer::Serialize(packer, obj);
|
||||
auto id = runtime_->Put(buffer);
|
||||
return RayObject<T>(id);
|
||||
return ObjectRef<T>(id);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
inline std::shared_ptr<T> Ray::Get(const RayObject<T> &object) {
|
||||
inline std::shared_ptr<T> Ray::Get(const ObjectRef<T> &object) {
|
||||
auto packed_object = runtime_->Get(object.ID());
|
||||
msgpack::unpacker unpacker;
|
||||
unpacker.reserve_buffer(packed_object->size());
|
||||
@@ -163,8 +175,8 @@ inline std::vector<std::shared_ptr<T>> Ray::Get(const std::vector<ObjectID> &ids
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
inline std::vector<std::shared_ptr<T>> Ray::Get(const std::vector<RayObject<T>> &ids) {
|
||||
auto object_ids = RayObjectsToObjectIDs<T>(ids);
|
||||
inline std::vector<std::shared_ptr<T>> Ray::Get(const std::vector<ObjectRef<T>> &ids) {
|
||||
auto object_ids = ObjectRefsToObjectIDs<T>(ids);
|
||||
return Get<T>(object_ids);
|
||||
}
|
||||
|
||||
@@ -175,39 +187,37 @@ inline WaitResult Ray::Wait(const std::vector<ObjectID> &ids, int num_objects,
|
||||
|
||||
template <typename ReturnType, typename FuncType, typename ExecFuncType,
|
||||
typename... ArgTypes>
|
||||
inline RayObject<ReturnType> Ray::CallInternal(FuncType &func, ExecFuncType &exec_func,
|
||||
ArgTypes &... args) {
|
||||
inline TaskCaller<ReturnType> Ray::TaskInternal(FuncType &func, ExecFuncType &exec_func,
|
||||
ArgTypes &... args) {
|
||||
std::shared_ptr<msgpack::sbuffer> buffer(new msgpack::sbuffer());
|
||||
msgpack::packer<msgpack::sbuffer> packer(buffer.get());
|
||||
Arguments::WrapArgs(packer, args...);
|
||||
RemoteFunctionPtrHolder ptr;
|
||||
ptr.function_pointer = reinterpret_cast<uintptr_t>(func);
|
||||
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
|
||||
auto returned_object_id = runtime_->Call(ptr, buffer);
|
||||
return RayObject<ReturnType>(returned_object_id);
|
||||
return TaskCaller<ReturnType>(runtime_, ptr, buffer);
|
||||
}
|
||||
|
||||
template <typename ReturnType, typename FuncType, typename ExecFuncType,
|
||||
template <typename ActorType, typename FuncType, typename ExecFuncType,
|
||||
typename... ArgTypes>
|
||||
inline RayActor<ReturnType> Ray::CreateActorInternal(FuncType &create_func,
|
||||
ExecFuncType &exec_func,
|
||||
ArgTypes &... args) {
|
||||
inline ActorCreator<ActorType> Ray::CreateActorInternal(FuncType &create_func,
|
||||
ExecFuncType &exec_func,
|
||||
ArgTypes &... args) {
|
||||
std::shared_ptr<msgpack::sbuffer> buffer(new msgpack::sbuffer());
|
||||
msgpack::packer<msgpack::sbuffer> packer(buffer.get());
|
||||
Arguments::WrapArgs(packer, args...);
|
||||
RemoteFunctionPtrHolder ptr;
|
||||
ptr.function_pointer = reinterpret_cast<uintptr_t>(create_func);
|
||||
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
|
||||
auto returned_actor_id = runtime_->CreateActor(ptr, buffer);
|
||||
return RayActor<ReturnType>(returned_actor_id);
|
||||
return ActorCreator<ActorType>(runtime_, ptr, buffer);
|
||||
}
|
||||
|
||||
template <typename ReturnType, typename ActorType, typename FuncType,
|
||||
typename ExecFuncType, typename... ArgTypes>
|
||||
inline RayObject<ReturnType> Ray::CallActorInternal(FuncType &actor_func,
|
||||
ExecFuncType &exec_func,
|
||||
RayActor<ActorType> &actor,
|
||||
ArgTypes &... args) {
|
||||
inline ActorTaskCaller<ReturnType> Ray::CallActorInternal(FuncType &actor_func,
|
||||
ExecFuncType &exec_func,
|
||||
ActorHandle<ActorType> &actor,
|
||||
ArgTypes &... args) {
|
||||
std::shared_ptr<msgpack::sbuffer> buffer(new msgpack::sbuffer());
|
||||
msgpack::packer<msgpack::sbuffer> packer(buffer.get());
|
||||
Arguments::WrapArgs(packer, args...);
|
||||
@@ -215,8 +225,7 @@ inline RayObject<ReturnType> Ray::CallActorInternal(FuncType &actor_func,
|
||||
MemberFunctionPtrHolder holder = *(MemberFunctionPtrHolder *)(&actor_func);
|
||||
ptr.function_pointer = reinterpret_cast<uintptr_t>(holder.value[0]);
|
||||
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
|
||||
auto returned_object_id = runtime_->CallActor(ptr, actor.ID(), buffer);
|
||||
return RayObject<ReturnType>(returned_object_id);
|
||||
return ActorTaskCaller<ReturnType>(runtime_, actor.ID(), ptr, buffer);
|
||||
}
|
||||
|
||||
#include <ray/api/generated/exec_funcs.generated.h>
|
||||
|
||||
Reference in New Issue
Block a user