diff --git a/.travis.yml b/.travis.yml index 782dbceef..0defa3691 100644 --- a/.travis.yml +++ b/.travis.yml @@ -298,6 +298,20 @@ matrix: - if [ $RAY_CI_RLLIB_FULL_AFFECTED != "1" ]; then exit; fi - ./ci/keep_alive bazel test --build_tests_only --test_tag_filters=tests_dir_J,tests_dir_K,tests_dir_L,tests_dir_M,tests_dir_N,tests_dir_O,tests_dir_P,tests_dir_Q,tests_dir_R,tests_dir_S,tests_dir_T,tests_dir_U,tests_dir_V,tests_dir_W,tests_dir_X,tests_dir_Y,tests_dir_Z --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors rllib/... + # Cpp worker test + - os: linux + env: + - TESTSUITE=cpp_worker + - PYTHON=3.6 + install: + - eval `python $TRAVIS_BUILD_DIR/ci/travis/determine_tests_to_run.py` + - ./ci/travis/install-bazel.sh + - ./ci/suppress_output ./ci/travis/install-dependencies.sh + - export PATH="$HOME/miniconda/bin:$PATH" + - ./ci/suppress_output ./ci/travis/install-ray.sh + script: + - bazel test //cpp:all --build_tests_only --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=streamed + install: - eval `python $TRAVIS_BUILD_DIR/ci/travis/determine_tests_to_run.py` - if [ $RAY_CI_SERVE_AFFECTED != "1" ] && [ $RAY_CI_TUNE_AFFECTED != "1" ] && [ $RAY_CI_RLLIB_AFFECTED != "1" ] && [ $RAY_CI_PYTHON_AFFECTED != "1" ]; then exit; fi diff --git a/BUILD.bazel b/BUILD.bazel index 7336b5ee6..0a93cdbc7 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -270,6 +270,7 @@ cc_library( "@com_google_absl//absl/container:flat_hash_map", "@com_google_absl//absl/container:flat_hash_set", "@com_google_googletest//:gtest", + "@msgpack", "@plasma//:plasma_client", ], ) diff --git a/bazel/BUILD.msgpack b/bazel/BUILD.msgpack new file mode 100644 index 000000000..52800d952 --- /dev/null +++ b/bazel/BUILD.msgpack @@ -0,0 +1,59 @@ +filegroup( + name = "c_headers", + srcs = glob([ + "include/*.h", + "include/**/*.h", + "include/**/**/*.h", + "include/**/**/**/*.h", + ], + exclude = [ + "include/msgpack.h", + ], + ), +) + +filegroup( + name = "cxx_headers", + srcs = glob([ + "include/*.hpp", + "include/**/*.hpp", + "include/**/**/*.hpp", + "include/**/**/**/*.hpp", + ], + exclude = [ + "include/msgpack.hpp", + ], + ), +) + +filegroup( + name = "source_files", + srcs = [ + "src/objectc.c", + "src/unpack.c", + "src/version.c", + "src/vrefbuffer.c", + "src/zone.c", + ], +) + +cc_library( + name = "msgpack", + srcs = [ + ":c_headers", + ":cxx_headers", + ":source_files", + ], + hdrs = [ + "include/msgpack.h", + "include/msgpack.hpp", + ], + includes = [ + "include", + ], + strip_include_prefix = "include", + copts = [ + #"-std=c++11", + ], + visibility = ["//visibility:public"], +) \ No newline at end of file diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index 3769e03df..de011fc35 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -234,3 +234,10 @@ def ray_deps_setup(): url = "https://github.com/rules-proto-grpc/rules_proto_grpc/archive/a74fef39c5fe636580083545f76d1eab74f6450d.tar.gz", sha256 = "2f6606151ec042e23396f07de9e7dcf6ca9a5db1d2b09f0cc93a7fc7f4008d1b", ) + + auto_http_archive( + name = "msgpack", + build_file = True, + url = "https://github.com/msgpack/msgpack-c/archive/8085ab8721090a447cf98bb802d1406ad7afe420.tar.gz", + sha256 = "83c37c9ad926bbee68d564d9f53c6cbb057c1f755c264043ddd87d89e36d15bb", + ) diff --git a/ci/travis/bazel-format.sh b/ci/travis/bazel-format.sh index 817ac219f..7a11cafb2 100755 --- a/ci/travis/bazel-format.sh +++ b/ci/travis/bazel-format.sh @@ -45,6 +45,6 @@ done pushd $ROOT_DIR/../.. BAZEL_FILES="bazel/BUILD bazel/BUILD.plasma bazel/ray.bzl BUILD.bazel java/BUILD.bazel - streaming/BUILD.bazel streaming/java/BUILD.bazel WORKSPACE" + cpp/BUILD.bazel streaming/BUILD.bazel streaming/java/BUILD.bazel WORKSPACE" buildifier -mode=$RUN_TYPE -diff_command="diff -u" $BAZEL_FILES popd diff --git a/cpp/BUILD.bazel b/cpp/BUILD.bazel new file mode 100644 index 000000000..5971b1d41 --- /dev/null +++ b/cpp/BUILD.bazel @@ -0,0 +1,87 @@ +# Bazel build +# C/C++ documentation: https://docs.bazel.build/versions/master/be/c-cpp.html + +load("//bazel:ray.bzl", "COPTS") + +cc_library( + name = "libray_api_header", + hdrs = glob([ + "include/ray/*.h", + "include/ray/**/*.h", + "include/ray/**/**/*.h", + ]), + copts = COPTS, + strip_include_prefix = "include", + deps = [ + "//:core_worker_lib", + "//:ray_common", + "@msgpack", + ], +) + +cc_binary( + name = "libray_api.so", + srcs = glob([ + "src/ray/api.cc", + "src/ray/api/*.cc", + "src/ray/api/*.h", + "src/ray/app/*.cc", + "src/ray/app/*.h", + "src/ray/runtime/*.cc", + "src/ray/runtime/*.h", + "src/ray/runtime/**/*.cc", + "src/ray/runtime/**/*.h", + "src/ray/runtime/task/*.cc", + "src/ray/runtime/task/*.h", + "src/ray/util/*.cc", + "src/ray/util/*.h", + "src/ray/*.cc", + "src/ray/*.h", + ]), + copts = COPTS, + linkopts = ["-ldl"], + linkshared = 1, + linkstatic = False, + visibility = ["//visibility:public"], + deps = [ + "libray_api_header", + "//:core_worker_lib", + "//:ray_common", + "//:ray_util", + "@boost//:asio", + "@boost//:thread", + "@com_google_absl//absl/synchronization", + "@msgpack", + ], +) + +cc_import( + name = "ray_api", + shared_library = "libray_api.so", +) + +cc_binary( + name = "example", + testonly = 1, + srcs = [ + "src/example/example.cc", + ], + copts = COPTS, + deps = [ + "libray_api_header", + "ray_api", + ], +) + +cc_test( + name = "api_test", + srcs = glob([ + "src/ray/test/*.cc", + ]), + copts = COPTS, + deps = [ + "libray_api_header", + "ray_api", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/cpp/include/ray/api.h b/cpp/include/ray/api.h new file mode 100644 index 000000000..8d1952d71 --- /dev/null +++ b/cpp/include/ray/api.h @@ -0,0 +1,231 @@ + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include "ray/core.h" +namespace ray { +namespace api { + +template +class RayObject; +template +class RayActor; + +class WaitResult; + +class Ray { + public: + /// Initialize Ray runtime. + static void Init(); + + /// Store an object in the object store. + /// + /// \param[in] obj The object which should be stored. + /// \return RayObject A reference to the object in the object store. + template + static RayObject Put(const T &obj); + + /// Get a list of objects from the object store. + /// This method will be blocked until all the objects are ready. + /// + /// \param[in] ids The object id array which should be got. + /// \return shared pointer array of the result. + template + static std::vector> Get(const std::vector &ids); + + /// Get a list of objects from the object store. + /// This method will be blocked until all the objects are ready. + /// + /// \param[in] objects The object array which should be got. + /// \return shared pointer array of the result. + template + static std::vector> Get(const std::vector> &ids); + + /// Wait for a list of RayObjects to be locally available, + /// until specified number of objects are ready, or specified timeout has passed. + /// + /// \param[in] ids The object id array which should be waited. + /// \param[in] num_objects The minimum number of objects to wait. + /// \param[in] timeout_ms The maximum wait time in milliseconds. + /// \return Two arrays, one containing locally available objects, one containing the + /// rest. + static WaitResult Wait(const std::vector &ids, int num_objects, + int timeout_ms); + +/// Include the `Call` methods for calling remote functions. +#include "api/generated/call_funcs.generated.h" + +/// Include the `CreateActor` methods for creating actors. +#include "api/generated/create_actors.generated.h" + + private: + static RayRuntime *runtime_; + + static std::once_flag is_inited_; + + /// Used by RayObject to implement .Get() + template + static std::shared_ptr Get(const RayObject &object); + + template + static RayObject CallInternal(FuncType &func, ExecFuncType &exec_func, + ArgTypes &... args); + + template + static RayActor CreateActorInternal(FuncType &func, ExecFuncType &exec_func, + ArgTypes &... args); + + template + static RayObject CallActorInternal(FuncType &actor_func, + ExecFuncType &exec_func, + RayActor &actor, + ArgTypes &... args); + +/// Include the `Call` methods for calling actor methods. +/// Used by RayActor to implement .Call() +#include "api/generated/call_actors.generated.h" + + template + friend class RayObject; + + template + friend class RayActor; +}; + +} // namespace api +} // namespace ray + +// --------- inline implementation ------------ +#include +#include +#include +#include +#include + +namespace ray { +namespace api { + +template +inline static std::vector RayObjectsToObjectIDs( + const std::vector> &ray_objects) { + std::vector object_ids; + for (auto it = ray_objects.begin(); it != ray_objects.end(); it++) { + object_ids.push_back(it->ID()); + } + return object_ids; +} + +template +inline RayObject Ray::Put(const T &obj) { + std::shared_ptr buffer(new msgpack::sbuffer()); + msgpack::packer packer(buffer.get()); + Serializer::Serialize(packer, obj); + auto id = runtime_->Put(buffer); + return RayObject(id); +} + +template +inline std::shared_ptr Ray::Get(const RayObject &object) { + auto packed_object = runtime_->Get(object.ID()); + msgpack::unpacker unpacker; + unpacker.reserve_buffer(packed_object->size()); + memcpy(unpacker.buffer(), packed_object->data(), packed_object->size()); + unpacker.buffer_consumed(packed_object->size()); + std::shared_ptr return_object(new T); + Serializer::Deserialize(unpacker, return_object.get()); + return return_object; +} + +template +inline std::vector> Ray::Get(const std::vector &ids) { + auto result = runtime_->Get(ids); + std::vector> return_objects; + return_objects.reserve(result.size()); + for (auto it = result.begin(); it != result.end(); it++) { + msgpack::unpacker unpacker; + unpacker.reserve_buffer((*it)->size()); + memcpy(unpacker.buffer(), (*it)->data(), (*it)->size()); + unpacker.buffer_consumed((*it)->size()); + std::shared_ptr obj(new T); + Serializer::Deserialize(unpacker, obj.get()); + return_objects.push_back(obj); + } + return return_objects; +} + +template +inline std::vector> Ray::Get(const std::vector> &ids) { + auto object_ids = RayObjectsToObjectIDs(ids); + return Get(object_ids); +} + +inline WaitResult Ray::Wait(const std::vector &ids, int num_objects, + int timeout_ms) { + return runtime_->Wait(ids, num_objects, timeout_ms); +} + +template +inline RayObject Ray::CallInternal(FuncType &func, ExecFuncType &exec_func, + ArgTypes &... args) { + std::shared_ptr buffer(new msgpack::sbuffer()); + msgpack::packer packer(buffer.get()); + Arguments::WrapArgs(packer, args...); + RemoteFunctionPtrHolder ptr; + ptr.function_pointer = reinterpret_cast(func); + ptr.exec_function_pointer = reinterpret_cast(exec_func); + auto returned_object_id = runtime_->Call(ptr, buffer); + return RayObject(returned_object_id); +} + +template +inline RayActor Ray::CreateActorInternal(FuncType &create_func, + ExecFuncType &exec_func, + ArgTypes &... args) { + std::shared_ptr buffer(new msgpack::sbuffer()); + msgpack::packer packer(buffer.get()); + Arguments::WrapArgs(packer, args...); + RemoteFunctionPtrHolder ptr; + ptr.function_pointer = reinterpret_cast(create_func); + ptr.exec_function_pointer = reinterpret_cast(exec_func); + auto returned_actor_id = runtime_->CreateActor(ptr, buffer); + return RayActor(returned_actor_id); +} + +template +inline RayObject Ray::CallActorInternal(FuncType &actor_func, + ExecFuncType &exec_func, + RayActor &actor, + ArgTypes &... args) { + std::shared_ptr buffer(new msgpack::sbuffer()); + msgpack::packer packer(buffer.get()); + Arguments::WrapArgs(packer, args...); + RemoteFunctionPtrHolder ptr; + MemberFunctionPtrHolder holder = *(MemberFunctionPtrHolder *)(&actor_func); + ptr.function_pointer = reinterpret_cast(holder.value[0]); + ptr.exec_function_pointer = reinterpret_cast(exec_func); + auto returned_object_id = runtime_->CallActor(ptr, actor.ID(), buffer); + return RayObject(returned_object_id); +} + +#include + +#include + +#include + +#include + +} // namespace api +} // namespace ray diff --git a/cpp/include/ray/api/arguments.h b/cpp/include/ray/api/arguments.h new file mode 100644 index 000000000..1642404a5 --- /dev/null +++ b/cpp/include/ray/api/arguments.h @@ -0,0 +1,84 @@ + +#pragma once + +#include +#include + +namespace ray { +namespace api { + +class Arguments { + public: + static void WrapArgs(msgpack::packer &packer); + + template + static void WrapArgs(msgpack::packer &packer, Arg1Type &arg1); + + template + static void WrapArgs(msgpack::packer &packer, Arg1Type &arg1, + OtherArgTypes &... args); + + static void UnwrapArgs(msgpack::unpacker &unpacker); + + template + static void UnwrapArgs(msgpack::unpacker &unpacker, std::shared_ptr *arg1); + + template + static void UnwrapArgs(msgpack::unpacker &unpacker, std::shared_ptr *arg1, + std::shared_ptr *... args); +}; + +// --------- inline implementation ------------ +#include + +inline void Arguments::WrapArgs(msgpack::packer &packer) {} + +template +inline void Arguments::WrapArgs(msgpack::packer &packer, + Arg1Type &arg1) { + /// Notice RayObjectClassPrefix should be modified by RayObject class name or namespace. + static const std::string RayObjectClassPrefix = "N3ray3api9RayObject"; + std::string type_name = typeid(arg1).name(); + if (type_name.rfind(RayObjectClassPrefix, 0) == 0) { + /// Pass by reference. + Serializer::Serialize(packer, true); + } else { + /// Pass by value. + Serializer::Serialize(packer, false); + } + Serializer::Serialize(packer, arg1); +} + +template +inline void Arguments::WrapArgs(msgpack::packer &packer, Arg1Type &arg1, + OtherArgTypes &... args) { + WrapArgs(packer, arg1); + WrapArgs(packer, args...); +} + +inline void Arguments::UnwrapArgs(msgpack::unpacker &unpacker) {} + +template +inline void Arguments::UnwrapArgs(msgpack::unpacker &unpacker, + std::shared_ptr *arg1) { + bool is_ray_object; + Serializer::Deserialize(unpacker, &is_ray_object); + if (is_ray_object) { + RayObject ray_object; + Serializer::Deserialize(unpacker, &ray_object); + *arg1 = ray_object.Get(); + } else { + Serializer::Deserialize(unpacker, arg1); + } +} + +template +inline void Arguments::UnwrapArgs(msgpack::unpacker &unpacker, + std::shared_ptr *arg1, + std::shared_ptr *... args) { + UnwrapArgs(unpacker, arg1); + UnwrapArgs(unpacker, args...); +} + +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/include/ray/api/generated/actor_call.generated.h b/cpp/include/ray/api/generated/actor_call.generated.h new file mode 100644 index 000000000..4ba63903f --- /dev/null +++ b/cpp/include/ray/api/generated/actor_call.generated.h @@ -0,0 +1,41 @@ +/// This file is auto-generated. DO NOT EDIT. +/// The following `Call` methods are used to call remote functions of actors. +/// Their arguments and return types are as following: +/// \param[in] actor_func The function pointer to be remote execution. +/// \param[in] arg1...argn The function arguments passed by a value or RayObject. +/// \return RayObject. + +// TODO(Guyang Song): code generation + +// 0 args +template +RayObject Call(ActorFunc0 actor_func); +// 1 arg +template +RayObject Call(ActorFunc1 actor_func, + Arg1Type arg1); + +template +RayObject Call(ActorFunc1 actor_func, + RayObject &arg1); + +// 2 args +template +RayObject Call( + ActorFunc2 actor_func, Arg1Type arg1, + Arg2Type arg2); + +template +RayObject Call( + ActorFunc2 actor_func, + RayObject &arg1, Arg2Type arg2); + +template +RayObject Call( + ActorFunc2 actor_func, Arg1Type arg1, + RayObject &arg2); + +template +RayObject Call( + ActorFunc2 actor_func, + RayObject &arg1, RayObject &arg2); diff --git a/cpp/include/ray/api/generated/actor_call_impl.generated.h b/cpp/include/ray/api/generated/actor_call_impl.generated.h new file mode 100644 index 000000000..f7861baed --- /dev/null +++ b/cpp/include/ray/api/generated/actor_call_impl.generated.h @@ -0,0 +1,57 @@ +// TODO(Guyang Song): code generation + +// 0 args +template +template +RayObject RayActor::Call( + ActorFunc0 actor_func) { + return Ray::Call(actor_func, *this); +} + +// 1 arg +template +template +RayObject RayActor::Call( + ActorFunc1 actor_func, Arg1Type arg1) { + return Ray::Call(actor_func, *this, arg1); +} + +template +template +RayObject RayActor::Call( + ActorFunc1 actor_func, RayObject &arg1) { + return Ray::Call(actor_func, *this, arg1); +} + +// 2 args +template +template +RayObject RayActor::Call( + ActorFunc2 actor_func, Arg1Type arg1, + Arg2Type arg2) { + return Ray::Call(actor_func, *this, arg1, arg2); +} + +template +template +RayObject RayActor::Call( + ActorFunc2 actor_func, + RayObject &arg1, Arg2Type arg2) { + return Ray::Call(actor_func, *this, arg1, arg2); +} + +template +template +RayObject RayActor::Call( + ActorFunc2 actor_func, Arg1Type arg1, + RayObject &arg2) { + return Ray::Call(actor_func, *this, arg1, arg2); +} + +template +template +RayObject RayActor::Call( + ActorFunc2 actor_func, + RayObject &arg1, RayObject &arg2) { + return Ray::Call(actor_func, *this, arg1, arg2); +} diff --git a/cpp/include/ray/api/generated/actor_funcs.generated.h b/cpp/include/ray/api/generated/actor_funcs.generated.h new file mode 100644 index 000000000..c86c84cdc --- /dev/null +++ b/cpp/include/ray/api/generated/actor_funcs.generated.h @@ -0,0 +1,15 @@ + + +// TODO(Guyang Song): code generation + +// 0 args +template +using ActorFunc0 = ReturnType (ActorType::*)(); + +// 1 arg +template +using ActorFunc1 = ReturnType (ActorType::*)(Arg1Type); + +// 2 args +template +using ActorFunc2 = ReturnType (ActorType::*)(Arg1Type, Arg2Type); diff --git a/cpp/include/ray/api/generated/call_actors.generated.h b/cpp/include/ray/api/generated/call_actors.generated.h new file mode 100644 index 000000000..cd7022c5d --- /dev/null +++ b/cpp/include/ray/api/generated/call_actors.generated.h @@ -0,0 +1,38 @@ + + +// TODO(Guyang Song): code generation + +// 0 args +template +static RayObject Call(ActorFunc0 actor_func, + RayActor &actor); + +// 1 arg +template +static RayObject Call(ActorFunc1 actor_func, + RayActor &actor, Arg1Type arg1); + +template +static RayObject Call(ActorFunc1 actor_func, + RayActor &actor, RayObject &arg1); + +// 2 args +template +static RayObject Call( + ActorFunc2 actor_func, + RayActor &actor, Arg1Type arg1, Arg2Type arg2); + +template +static RayObject Call( + ActorFunc2 actor_func, + RayActor &actor, RayObject &arg1, Arg2Type arg2); + +template +static RayObject Call( + ActorFunc2 actor_func, + RayActor &actor, Arg1Type arg1, RayObject &arg2); + +template +static RayObject Call( + ActorFunc2 actor_func, + RayActor &actor, RayObject &arg1, RayObject &arg2); diff --git a/cpp/include/ray/api/generated/call_actors_impl.generated.h b/cpp/include/ray/api/generated/call_actors_impl.generated.h new file mode 100644 index 000000000..849981231 --- /dev/null +++ b/cpp/include/ray/api/generated/call_actors_impl.generated.h @@ -0,0 +1,60 @@ +// TODO(Guyang Song): code generation +// 0 args +template +RayObject Ray::Call(ActorFunc0 actor_func, + RayActor &actor) { + return CallActorInternal( + actor_func, ActorExecFunction, actor); +} + +// 1 arg +template +RayObject Ray::Call(ActorFunc1 actor_func, + RayActor &actor, Arg1Type arg1) { + return CallActorInternal( + actor_func, ActorExecFunction, actor, arg1); +} + +template +RayObject Ray::Call(ActorFunc1 actor_func, + RayActor &actor, RayObject &arg1) { + return CallActorInternal( + actor_func, ActorExecFunction, actor, arg1); +} + +// 2 args +template +RayObject Ray::Call( + ActorFunc2 actor_func, + RayActor &actor, Arg1Type arg1, Arg2Type arg2) { + return CallActorInternal( + actor_func, ActorExecFunction, actor, + arg1, arg2); +} + +template +RayObject Ray::Call( + ActorFunc2 actor_func, + RayActor &actor, RayObject &arg1, Arg2Type arg2) { + return CallActorInternal( + actor_func, ActorExecFunction, actor, + arg1, arg2); +} + +template +RayObject Ray::Call( + ActorFunc2 actor_func, + RayActor &actor, Arg1Type arg1, RayObject &arg2) { + return CallActorInternal( + actor_func, ActorExecFunction, actor, + arg1, arg2); +} + +template +RayObject Ray::Call( + ActorFunc2 actor_func, + RayActor &actor, RayObject &arg1, RayObject &arg2) { + return CallActorInternal( + actor_func, ActorExecFunction, actor, + arg1, arg2); +} diff --git a/cpp/include/ray/api/generated/call_funcs.generated.h b/cpp/include/ray/api/generated/call_funcs.generated.h new file mode 100644 index 000000000..6850bc289 --- /dev/null +++ b/cpp/include/ray/api/generated/call_funcs.generated.h @@ -0,0 +1,37 @@ +/// This file is auto-generated. DO NOT EDIT. +/// The following `Call` methods are used to call remote functions. +/// Their arguments and return types are as following: +/// \param[in] func The function pointer to be remote execution. +/// \param[in] arg1...argn The function arguments passed by a value or RayObject. +/// \return RayObject. + +// TODO(Guyang Song): code generation + +// 0 args +template +static RayObject Call(Func0 func); + +// 1 arg +template +static RayObject Call(Func1 func, Arg1Type arg1); + +template +static RayObject Call(Func1 func, + RayObject &arg1); + +// 2 args +template +static RayObject Call(Func2 func, + Arg1Type arg1, Arg2Type arg2); + +template +static RayObject Call(Func2 func, + RayObject &arg1, Arg2Type arg2); + +template +static RayObject Call(Func2 func, + Arg1Type arg1, RayObject &arg2); + +template +static RayObject Call(Func2 func, + RayObject &arg1, RayObject &arg2); \ No newline at end of file diff --git a/cpp/include/ray/api/generated/call_funcs_impl.generated.h b/cpp/include/ray/api/generated/call_funcs_impl.generated.h new file mode 100644 index 000000000..b9c8d6f71 --- /dev/null +++ b/cpp/include/ray/api/generated/call_funcs_impl.generated.h @@ -0,0 +1,48 @@ +// TODO(Guyang Song): code generation + +// 0 args +template +RayObject Ray::Call(Func0 func) { + return CallInternal(func, NormalExecFunction); +} + +// 1 arg +template +RayObject Ray::Call(Func1 func, Arg1Type arg1) { + return CallInternal(func, NormalExecFunction, arg1); +} + +template +RayObject Ray::Call(Func1 func, + RayObject &arg1) { + return CallInternal(func, NormalExecFunction, arg1); +} + +// 2 args +template +RayObject Ray::Call(Func2 func, Arg1Type arg1, + Arg2Type arg2) { + return CallInternal( + func, NormalExecFunction, arg1, arg2); +} + +template +RayObject Ray::Call(Func2 func, + RayObject &arg1, Arg2Type arg2) { + return CallInternal( + func, NormalExecFunction, arg1, arg2); +} + +template +RayObject Ray::Call(Func2 func, Arg1Type arg1, + RayObject &arg2) { + return CallInternal( + func, NormalExecFunction, arg1, arg2); +} + +template +RayObject Ray::Call(Func2 func, + RayObject &arg1, RayObject &arg2) { + return CallInternal( + func, NormalExecFunction, arg1, arg2); +} \ No newline at end of file diff --git a/cpp/include/ray/api/generated/create_actors.generated.h b/cpp/include/ray/api/generated/create_actors.generated.h new file mode 100644 index 000000000..1bf43d592 --- /dev/null +++ b/cpp/include/ray/api/generated/create_actors.generated.h @@ -0,0 +1,42 @@ +/// This file is auto-generated. DO NOT EDIT. +/// The following `Call` methods are used to call remote functions and create an actor. +/// Their arguments and return types are as following: +/// \param[in] create_func The function pointer to be remote execution. +/// \param[in] arg1...argn The function arguments passed by a value or RayObject. +/// \return RayActor. + +// TODO(Guyang Song): code generation + +// 0 args +template +static RayActor CreateActor(CreateActorFunc0 create_func); + +// 1 arg +template +static RayActor CreateActor( + CreateActorFunc1 create_func, Arg1Type arg1); + +template +static RayActor CreateActor( + CreateActorFunc1 create_func, RayObject &arg1); + +// 2 args +template +static RayActor CreateActor( + CreateActorFunc2 create_func, Arg1Type arg1, + Arg2Type arg2); + +template +static RayActor CreateActor( + CreateActorFunc2 create_func, + RayObject &arg1, Arg2Type arg2); + +template +static RayActor CreateActor( + CreateActorFunc2 create_func, Arg1Type arg1, + RayObject &arg2); + +template +static RayActor CreateActor( + CreateActorFunc2 create_func, + RayObject &arg1, RayObject &arg2); \ No newline at end of file diff --git a/cpp/include/ray/api/generated/create_actors_impl.generated.h b/cpp/include/ray/api/generated/create_actors_impl.generated.h new file mode 100644 index 000000000..6d096a0a7 --- /dev/null +++ b/cpp/include/ray/api/generated/create_actors_impl.generated.h @@ -0,0 +1,56 @@ +// TODO(Guyang Song): code generation + +// 0 args +template +RayActor Ray::CreateActor(CreateActorFunc0 create_func) { + return CreateActorInternal(create_func, + CreateActorExecFunction); +} + +// 1 arg +template +RayActor Ray::CreateActor(CreateActorFunc1 create_func, + Arg1Type arg1) { + return CreateActorInternal( + create_func, CreateActorExecFunction, arg1); +} + +template +RayActor Ray::CreateActor(CreateActorFunc1 create_func, + RayObject &arg1) { + return CreateActorInternal( + create_func, CreateActorExecFunction, arg1); +} + +// 2 args +template +RayActor Ray::CreateActor( + CreateActorFunc2 create_func, Arg1Type arg1, + Arg2Type arg2) { + return CreateActorInternal( + create_func, CreateActorExecFunction, arg1, arg2); +} + +template +RayActor Ray::CreateActor( + CreateActorFunc2 create_func, + RayObject &arg1, Arg2Type arg2) { + return CreateActorInternal( + create_func, CreateActorExecFunction, arg1, arg2); +} + +template +RayActor Ray::CreateActor( + CreateActorFunc2 create_func, Arg1Type arg1, + RayObject &arg2) { + return CreateActorInternal( + create_func, CreateActorExecFunction, arg1, arg2); +} + +template +RayActor Ray::CreateActor( + CreateActorFunc2 create_func, + RayObject &arg1, RayObject &arg2) { + return CreateActorInternal( + create_func, CreateActorExecFunction, arg1, arg2); +} \ No newline at end of file diff --git a/cpp/include/ray/api/generated/create_funcs.generated.h b/cpp/include/ray/api/generated/create_funcs.generated.h new file mode 100644 index 000000000..75b604e62 --- /dev/null +++ b/cpp/include/ray/api/generated/create_funcs.generated.h @@ -0,0 +1,14 @@ + +// TODO(Guyang Song): code generation + +// 0 args +template +using CreateActorFunc0 = ReturnType *(*)(); + +// 1 arg +template +using CreateActorFunc1 = ReturnType *(*)(Arg1Type); + +// 2 args +template +using CreateActorFunc2 = ReturnType *(*)(Arg1Type, Arg2Type); diff --git a/cpp/include/ray/api/generated/exec_funcs.generated.h b/cpp/include/ray/api/generated/exec_funcs.generated.h new file mode 100644 index 000000000..0b218824f --- /dev/null +++ b/cpp/include/ray/api/generated/exec_funcs.generated.h @@ -0,0 +1,159 @@ +/// This file is auto-generated. DO NOT EDIT. +/// The following execution functions are wrappers of remote functions. +/// Execution functions make remote functions executable in distributed system. +/// NormalExecFunction the wrapper of normal remote function. +/// CreateActorExecFunction the wrapper of actor creation function. +/// ActorExecFunction the wrapper of actor member function. + +// TODO(Guyang Song): code generation + +template +std::shared_ptr ExecuteNormalFunction( + uintptr_t base_addr, size_t func_offset, + std::shared_ptr &args_buffer, TaskType task_type, + std::shared_ptr &... args) { + msgpack::unpacker unpacker; + unpacker.reserve_buffer(args_buffer->size()); + memcpy(unpacker.buffer(), args_buffer->data(), args_buffer->size()); + unpacker.buffer_consumed(args_buffer->size()); + Arguments::UnwrapArgs(unpacker, &args...); + + ReturnType return_value; + typedef ReturnType (*Func)(OtherArgTypes...); + Func func = (Func)(base_addr + func_offset); + return_value = (*func)(*args...); + + std::shared_ptr returnBuffer(new msgpack::sbuffer()); + msgpack::packer packer(returnBuffer.get()); + Serializer::Serialize(packer, (CastReturnType)(return_value)); + + return returnBuffer; +} + +template +std::shared_ptr ExecuteActorFunction( + uintptr_t base_addr, size_t func_offset, + std::shared_ptr &args_buffer, + std::shared_ptr &actor_buffer, + std::shared_ptr &... args) { + msgpack::unpacker actor_unpacker; + actor_unpacker.reserve_buffer(actor_buffer->size()); + memcpy(actor_unpacker.buffer(), actor_buffer->data(), actor_buffer->size()); + actor_unpacker.buffer_consumed(actor_buffer->size()); + uintptr_t actor_ptr; + Serializer::Deserialize(actor_unpacker, &actor_ptr); + ActorType *actor_object = (ActorType *)actor_ptr; + + msgpack::unpacker unpacker; + unpacker.reserve_buffer(args_buffer->size()); + memcpy(unpacker.buffer(), args_buffer->data(), args_buffer->size()); + unpacker.buffer_consumed(args_buffer->size()); + Arguments::UnwrapArgs(unpacker, &args...); + + ReturnType return_value; + typedef ReturnType (ActorType::*Func)(OtherArgTypes...); + MemberFunctionPtrHolder holder; + holder.value[0] = base_addr + func_offset; + holder.value[1] = 0; + Func func = *((Func *)&holder); + return_value = (actor_object->*func)(*args...); + + std::shared_ptr returnBuffer(new msgpack::sbuffer()); + msgpack::packer packer(returnBuffer.get()); + Serializer::Serialize(packer, return_value); + return returnBuffer; +} + +// 0 args +template +std::shared_ptr NormalExecFunction( + uintptr_t base_addr, size_t func_offset, + std::shared_ptr &args_buffer) { + return ExecuteNormalFunction( + base_addr, func_offset, args_buffer, TaskType::NORMAL_TASK); +} + +// 1 arg +template +std::shared_ptr NormalExecFunction( + uintptr_t base_addr, size_t func_offset, + std::shared_ptr &args_buffer) { + std::shared_ptr arg1_ptr; + return ExecuteNormalFunction( + base_addr, func_offset, args_buffer, TaskType::NORMAL_TASK, arg1_ptr); +} + +// 2 args +template +std::shared_ptr NormalExecFunction( + uintptr_t base_addr, size_t func_offset, + std::shared_ptr &args_buffer) { + std::shared_ptr arg1_ptr; + std::shared_ptr arg2_ptr; + return ExecuteNormalFunction( + base_addr, func_offset, args_buffer, TaskType::NORMAL_TASK, arg1_ptr, arg2_ptr); +} + +// 0 args +template +std::shared_ptr CreateActorExecFunction( + uintptr_t base_addr, size_t func_offset, + std::shared_ptr &args_buffer) { + return ExecuteNormalFunction(base_addr, func_offset, args_buffer, + TaskType::ACTOR_CREATION_TASK); +} + +// 1 arg +template +std::shared_ptr CreateActorExecFunction( + uintptr_t base_addr, size_t func_offset, + std::shared_ptr &args_buffer) { + std::shared_ptr arg1_ptr; + return ExecuteNormalFunction( + base_addr, func_offset, args_buffer, TaskType::ACTOR_CREATION_TASK, arg1_ptr); +} + +// 2 args +template +std::shared_ptr CreateActorExecFunction( + uintptr_t base_addr, size_t func_offset, + std::shared_ptr &args_buffer) { + std::shared_ptr arg1_ptr; + std::shared_ptr arg2_ptr; + return ExecuteNormalFunction(base_addr, func_offset, args_buffer, + TaskType::ACTOR_CREATION_TASK, + arg1_ptr, arg2_ptr); +} + +// 0 args +template +std::shared_ptr ActorExecFunction( + uintptr_t base_addr, size_t func_offset, + std::shared_ptr &args_buffer, + std::shared_ptr &actor_buffer) { + return ExecuteActorFunction(base_addr, func_offset, args_buffer, + actor_buffer); +} + +// 1 arg +template +std::shared_ptr ActorExecFunction( + uintptr_t base_addr, size_t func_offset, + std::shared_ptr &args_buffer, + std::shared_ptr &actor_buffer) { + std::shared_ptr arg1_ptr; + return ExecuteActorFunction(base_addr, func_offset, args_buffer, + actor_buffer, arg1_ptr); +} + +// 2 args +template +std::shared_ptr ActorExecFunction( + uintptr_t base_addr, size_t func_offset, + std::shared_ptr &args_buffer, + std::shared_ptr &actor_buffer) { + std::shared_ptr arg1_ptr; + std::shared_ptr arg2_ptr; + return ExecuteActorFunction(base_addr, func_offset, args_buffer, + actor_buffer, arg1_ptr, arg2_ptr); +} diff --git a/cpp/include/ray/api/generated/funcs.generated.h b/cpp/include/ray/api/generated/funcs.generated.h new file mode 100644 index 000000000..ff8671860 --- /dev/null +++ b/cpp/include/ray/api/generated/funcs.generated.h @@ -0,0 +1,15 @@ + + +// TODO(Guyang Song): code generation + +// 0 args +template +using Func0 = ReturnType (*)(); + +// 1 arg +template +using Func1 = ReturnType (*)(Arg1Type); + +// 2 args +template +using Func2 = ReturnType (*)(Arg1Type, Arg2Type); diff --git a/cpp/include/ray/api/ray_actor.h b/cpp/include/ray/api/ray_actor.h new file mode 100644 index 000000000..c609e7fe9 --- /dev/null +++ b/cpp/include/ray/api/ray_actor.h @@ -0,0 +1,52 @@ + +#pragma once + +#include "ray/core.h" + +namespace ray { +namespace api { + +#include + +/// A handle to an actor which can be used to invoke a remote actor method, with the +/// `Call` method. +/// \param ActorType The type of the concrete actor class. +/// Note, the `Call` method is defined in actor_call.generated.h. +template +class RayActor { + public: + RayActor(); + + RayActor(const ActorID &id); + + /// Get a untyped ID of the actor + const ActorID &ID() const; + + /// Include the `Call` methods for calling remote functions. +#include + + /// Make RayActor serializable + MSGPACK_DEFINE(id_); + + private: + ActorID id_; +}; + +// ---------- implementation ---------- + +template +RayActor::RayActor() {} + +template +RayActor::RayActor(const ActorID &id) { + id_ = id; +} + +template +const ActorID &RayActor::ID() const { + return id_; +} + +#include +} // namespace api +} // namespace ray diff --git a/cpp/include/ray/api/ray_config.h b/cpp/include/ray/api/ray_config.h new file mode 100644 index 000000000..3f5528f3f --- /dev/null +++ b/cpp/include/ray/api/ray_config.h @@ -0,0 +1,20 @@ + +#pragma once + +namespace ray { +namespace api { + +enum class RunMode { SINGLE_PROCESS, SINGLE_BOX, CLUSTER }; + +enum class WorkerMode { NONE, DRIVER, WORKER }; + +/// TODO(Guyang Song): Make configuration complete and use to initialize. +class RayConfig { + public: + WorkerMode workerMode = WorkerMode::DRIVER; + + RunMode runMode = RunMode::SINGLE_PROCESS; +}; + +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/include/ray/api/ray_exception.h b/cpp/include/ray/api/ray_exception.h new file mode 100644 index 000000000..3c6117b68 --- /dev/null +++ b/cpp/include/ray/api/ray_exception.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + +namespace ray { +namespace api { + +class RayException : public std::exception { + public: + RayException(const std::string &msg) : msg_(msg){}; + + const char *what() const noexcept override { return msg_.c_str(); }; + + std::string msg_; +}; +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/include/ray/api/ray_object.h b/cpp/include/ray/api/ray_object.h new file mode 100644 index 000000000..aeb9d24a6 --- /dev/null +++ b/cpp/include/ray/api/ray_object.h @@ -0,0 +1,67 @@ + +#pragma once + +#include +#include + +#include + +#include "ray/core.h" + +namespace ray { +namespace api { + +/// Represents an object in the object store.. +/// \param T The type of object. +template +class RayObject { + public: + RayObject(); + + RayObject(const ObjectID &id); + + bool operator==(const RayObject &object) const; + + /// Get a untyped ID of the object + const ObjectID &ID() const; + + /// Get the object from the object store. + /// This method will be blocked until the object is ready. + /// + /// \return shared pointer of the result. + std::shared_ptr Get() const; + + /// Make RayObject serializable + MSGPACK_DEFINE(id_); + + private: + ObjectID id_; +}; + +// ---------- implementation ---------- +#include + +template +RayObject::RayObject() {} + +template +RayObject::RayObject(const ObjectID &id) { + id_ = id; +} + +template +inline bool RayObject::operator==(const RayObject &object) const { + return id_ == object.id_; +} + +template +const ObjectID &RayObject::ID() const { + return id_; +} + +template +inline std::shared_ptr RayObject::Get() const { + return Ray::Get(*this); +} +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/include/ray/api/ray_runtime.h b/cpp/include/ray/api/ray_runtime.h new file mode 100644 index 000000000..f87771cd2 --- /dev/null +++ b/cpp/include/ray/api/ray_runtime.h @@ -0,0 +1,46 @@ + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include "ray/core.h" + +namespace ray { +namespace api { + +struct MemberFunctionPtrHolder { + uintptr_t value[2]; +}; + +struct RemoteFunctionPtrHolder { + /// The remote function pointer + uintptr_t function_pointer; + /// The executable function pointer + uintptr_t exec_function_pointer; +}; + +class RayRuntime { + public: + virtual ObjectID Put(std::shared_ptr data) = 0; + virtual std::shared_ptr Get(const ObjectID &id) = 0; + + virtual std::vector> Get( + const std::vector &ids) = 0; + + virtual WaitResult Wait(const std::vector &ids, int num_objects, + int timeout_ms) = 0; + + virtual ObjectID Call(RemoteFunctionPtrHolder &fptr, + std::shared_ptr args) = 0; + virtual ActorID CreateActor(RemoteFunctionPtrHolder &fptr, + std::shared_ptr args) = 0; + virtual ObjectID CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor, + std::shared_ptr args) = 0; +}; +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/include/ray/api/serializer.h b/cpp/include/ray/api/serializer.h new file mode 100644 index 000000000..c891e691d --- /dev/null +++ b/cpp/include/ray/api/serializer.h @@ -0,0 +1,41 @@ + +#pragma once + +#include +#include + +namespace ray { +namespace api { + +class Serializer { + public: + template + static void Serialize(msgpack::packer &packer, const T &val); + + template + static void Deserialize(msgpack::unpacker &unpacker, T *val); +}; + +// ---------- implementation ---------- + +template +inline void Serializer::Serialize(msgpack::packer &packer, + const T &val) { + packer.pack(val); + return; +} + +template +inline void Serializer::Deserialize(msgpack::unpacker &unpacker, T *val) { + msgpack::object_handle oh; + bool result = unpacker.next(oh); + if (result == false) { + throw RayException("unpack error"); + } + msgpack::object obj = oh.get(); + obj.convert(*val); + return; +} + +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/include/ray/api/wait_result.h b/cpp/include/ray/api/wait_result.h new file mode 100644 index 000000000..2fb0036eb --- /dev/null +++ b/cpp/include/ray/api/wait_result.h @@ -0,0 +1,22 @@ + +#pragma once + +#include +#include "ray/core.h" + +namespace ray { +namespace api { + +class WaitResult { + public: + /// The object id array of ready objects + std::vector ready; + /// The object id array of unready objects + std::vector unready; + WaitResult(){}; + WaitResult(std::vector &&ready_objects, + std::vector &&unready_objects) + : ready(std::move(ready_objects)), unready(std::move(unready_objects)){}; +}; +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/include/ray/core.h b/cpp/include/ray/core.h new file mode 100644 index 000000000..764983815 --- /dev/null +++ b/cpp/include/ray/core.h @@ -0,0 +1,13 @@ + +#pragma once + +#include "ray/common/buffer.h" +#include "ray/common/function_descriptor.h" +#include "ray/common/id.h" +#include "ray/common/status.h" +#include "ray/common/task/task_common.h" +#include "ray/common/task/task_spec.h" +#include "ray/common/task/task_util.h" +#include "ray/core_worker/context.h" +#include "ray/core_worker/store_provider/memory_store/memory_store.h" +#include "ray/util/logging.h" diff --git a/cpp/src/example/example.cc b/cpp/src/example/example.cc new file mode 100644 index 000000000..04ed01dc5 --- /dev/null +++ b/cpp/src/example/example.cc @@ -0,0 +1,76 @@ + +/// This is a complete example of writing a distributed program using the C ++ worker API. + +/// including the header +#include + +/// using namespace +using namespace ray::api; + +/// general function of user code +int Return1() { return 1; } +int Plus1(int x) { return x + 1; } +int Plus(int x, int y) { return x + y; } + +/// a class of user code +class Counter { + public: + int count; + + Counter() { count = 0; } + + static Counter *FactoryCreate() { return new Counter(); } + /// non static function + int Add(int x) { + count += x; + return count; + } +}; + +int main() { + /// initialization + Ray::Init(); + + /// put and get object + auto obj = Ray::Put(123); + auto getRsult = obj.Get(); + + /// general function remote call(args passed by value) + auto r0 = Ray::Call(Return1); + auto r1 = Ray::Call(Plus1, 1); + auto r2 = Ray::Call(Plus, 1, 2); + + int result0 = *(r0.Get()); + int result1 = *(r1.Get()); + int result2 = *(r2.Get()); + + std::cout << "Ray::call with value results: " << result0 << " " << result1 << " " + << result2 << std::endl; + + /// general function remote call(args passed by reference) + auto r3 = Ray::Call(Return1); + auto r4 = Ray::Call(Plus1, r3); + auto r5 = Ray::Call(Plus, r4, 1); + + int result3 = *(r3.Get()); + int result4 = *(r4.Get()); + int result5 = *(r5.Get()); + + std::cout << "Ray::call with reference results: " << result3 << " " << result4 << " " + << result5 << std::endl; + + /// create actor and actor function remote call + RayActor actor = Ray::CreateActor(Counter::FactoryCreate); + auto r6 = actor.Call(&Counter::Add, 5); + auto r7 = actor.Call(&Counter::Add, 1); + auto r8 = actor.Call(&Counter::Add, 1); + auto r9 = actor.Call(&Counter::Add, r8); + + int result6 = *(r6.Get()); + int result7 = *(r7.Get()); + int result8 = *(r8.Get()); + int result9 = *(r9.Get()); + + std::cout << "Ray::call with actor results: " << result6 << " " << result7 << " " + << result8 << " " << result9 << std::endl; +} diff --git a/cpp/src/ray/api.cc b/cpp/src/ray/api.cc new file mode 100644 index 000000000..c10ce6915 --- /dev/null +++ b/cpp/src/ray/api.cc @@ -0,0 +1,20 @@ + +#include + +#include +#include "runtime/abstract_ray_runtime.h" + +namespace ray { +namespace api { + +RayRuntime *Ray::runtime_ = nullptr; + +std::once_flag Ray::is_inited_; +void Ray::Init() { + std::call_once(is_inited_, [] { + runtime_ = AbstractRayRuntime::DoInit(std::make_shared()); + }); +} + +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc new file mode 100644 index 000000000..bc00ad21b --- /dev/null +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -0,0 +1,105 @@ + +#include "abstract_ray_runtime.h" + +#include + +#include +#include +#include +#include "../util/address_helper.h" +#include "local_mode_ray_runtime.h" + +namespace ray { +namespace api { +AbstractRayRuntime *AbstractRayRuntime::DoInit(std::shared_ptr config) { + AbstractRayRuntime *runtime; + if (config->runMode == RunMode::SINGLE_PROCESS) { + GenerateBaseAddressOfCurrentLibrary(); + runtime = new LocalModeRayRuntime(config); + } else { + throw RayException("Only single process mode supported now"); + } + RAY_CHECK(runtime); + return runtime; +} + +void AbstractRayRuntime::Put(std::shared_ptr data, + const ObjectID &object_id) { + object_store_->Put(object_id, data); +} + +ObjectID AbstractRayRuntime::Put(std::shared_ptr data) { + ObjectID object_id = + ObjectID::ForPut(worker_->GetCurrentTaskID(), worker_->GetNextPutIndex(), + static_cast(TaskTransportType::RAYLET)); + Put(data, object_id); + return object_id; +} + +std::shared_ptr AbstractRayRuntime::Get(const ObjectID &object_id) { + return object_store_->Get(object_id, -1); +} + +std::vector> AbstractRayRuntime::Get( + const std::vector &ids) { + return object_store_->Get(ids, -1); +} + +WaitResult AbstractRayRuntime::Wait(const std::vector &ids, int num_objects, + int timeout_ms) { + return object_store_->Wait(ids, num_objects, timeout_ms); +} + +ObjectID AbstractRayRuntime::Call(RemoteFunctionPtrHolder &fptr, + std::shared_ptr args) { + InvocationSpec invocationSpec; + invocationSpec.task_id = + TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task + invocationSpec.actor_id = ActorID::Nil(); + invocationSpec.args = args; + invocationSpec.func_offset = + (size_t)(fptr.function_pointer - dynamic_library_base_addr); + invocationSpec.exec_func_offset = + (size_t)(fptr.exec_function_pointer - dynamic_library_base_addr); + return task_submitter_->SubmitTask(invocationSpec); +} + +ActorID AbstractRayRuntime::CreateActor(RemoteFunctionPtrHolder &fptr, + std::shared_ptr args) { + return task_submitter_->CreateActor(fptr, args); +} + +ObjectID AbstractRayRuntime::CallActor(const RemoteFunctionPtrHolder &fptr, + const ActorID &actor, + std::shared_ptr args) { + InvocationSpec invocationSpec; + invocationSpec.task_id = + TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task + invocationSpec.actor_id = actor; + invocationSpec.args = args; + invocationSpec.func_offset = + (size_t)(fptr.function_pointer - dynamic_library_base_addr); + invocationSpec.exec_func_offset = + (size_t)(fptr.exec_function_pointer - dynamic_library_base_addr); + return task_submitter_->SubmitActorTask(invocationSpec); +} + +const TaskID &AbstractRayRuntime::GetCurrentTaskId() { + return worker_->GetCurrentTaskID(); +} + +const JobID &AbstractRayRuntime::GetCurrentJobID() { return worker_->GetCurrentJobID(); } + +ActorID AbstractRayRuntime::GetNextActorID() { + const int next_task_index = worker_->GetNextTaskIndex(); + const ActorID actor_id = ActorID::Of(worker_->GetCurrentJobID(), + worker_->GetCurrentTaskID(), next_task_index); + return actor_id; +} + +const std::unique_ptr &AbstractRayRuntime::GetWorkerContext() { + return worker_; +} + +} // namespace api +} // namespace ray diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.h b/cpp/src/ray/runtime/abstract_ray_runtime.h new file mode 100644 index 000000000..a093c1ec8 --- /dev/null +++ b/cpp/src/ray/runtime/abstract_ray_runtime.h @@ -0,0 +1,62 @@ + +#pragma once + +#include + +#include +#include +#include +#include "./object/object_store.h" +#include "./task/task_executor.h" +#include "./task/task_submitter.h" +#include "ray/core.h" + +namespace ray { +namespace api { + +class AbstractRayRuntime : public RayRuntime { + public: + virtual ~AbstractRayRuntime(){}; + + void Put(std::shared_ptr data, const ObjectID &object_id); + + ObjectID Put(std::shared_ptr data); + + std::shared_ptr Get(const ObjectID &id); + + std::vector> Get(const std::vector &ids); + + WaitResult Wait(const std::vector &ids, int num_objects, int timeout_ms); + + ObjectID Call(RemoteFunctionPtrHolder &fptr, std::shared_ptr args); + + ActorID CreateActor(RemoteFunctionPtrHolder &fptr, + std::shared_ptr args); + + ObjectID CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor, + std::shared_ptr args); + + ActorID GetNextActorID(); + + const TaskID &GetCurrentTaskId(); + + const JobID &GetCurrentJobID(); + + const std::unique_ptr &GetWorkerContext(); + + protected: + std::shared_ptr config_; + std::unique_ptr worker_; + std::unique_ptr task_submitter_; + std::unique_ptr task_executor_; + std::unique_ptr object_store_; + + private: + static AbstractRayRuntime *DoInit(std::shared_ptr config); + + void Execute(const TaskSpecification &task_spec); + + friend class Ray; +}; +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/local_mode_ray_runtime.cc b/cpp/src/ray/runtime/local_mode_ray_runtime.cc new file mode 100644 index 000000000..898f3cad4 --- /dev/null +++ b/cpp/src/ray/runtime/local_mode_ray_runtime.cc @@ -0,0 +1,22 @@ + +#include "local_mode_ray_runtime.h" + +#include +#include "../util/address_helper.h" +#include "./object/local_mode_object_store.h" +#include "./object/object_store.h" +#include "./task/local_mode_task_submitter.h" + +namespace ray { +namespace api { + +LocalModeRayRuntime::LocalModeRayRuntime(std::shared_ptr config) { + config_ = config; + worker_ = + std::unique_ptr(new WorkerContext(WorkerType::DRIVER, JobID::Nil())); + object_store_ = std::unique_ptr(new LocalModeObjectStore(*this)); + task_submitter_ = std::unique_ptr(new LocalModeTaskSubmitter(*this)); +} + +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/local_mode_ray_runtime.h b/cpp/src/ray/runtime/local_mode_ray_runtime.h new file mode 100644 index 000000000..281d0f571 --- /dev/null +++ b/cpp/src/ray/runtime/local_mode_ray_runtime.h @@ -0,0 +1,17 @@ + +#pragma once + +#include +#include "abstract_ray_runtime.h" +#include "ray/core.h" + +namespace ray { +namespace api { + +class LocalModeRayRuntime : public AbstractRayRuntime { + public: + LocalModeRayRuntime(std::shared_ptr config); +}; + +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/object/local_mode_object_store.cc b/cpp/src/ray/runtime/object/local_mode_object_store.cc new file mode 100644 index 000000000..cb0268116 --- /dev/null +++ b/cpp/src/ray/runtime/object/local_mode_object_store.cc @@ -0,0 +1,89 @@ + +#include +#include +#include +#include + +#include +#include "../abstract_ray_runtime.h" +#include "local_mode_object_store.h" + +namespace ray { +namespace api { +LocalModeObjectStore::LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_tuntime) + : local_mode_ray_tuntime_(local_mode_ray_tuntime) { + memory_store_ = + std::unique_ptr<::ray::CoreWorkerMemoryStore>(new ::ray::CoreWorkerMemoryStore()); +} + +void LocalModeObjectStore::PutRaw(const ObjectID &object_id, + std::shared_ptr data) { + auto buffer = std::make_shared<::ray::LocalMemoryBuffer>( + reinterpret_cast(data->data()), data->size(), true); + auto status = memory_store_->Put( + ::ray::RayObject(buffer, nullptr, std::vector()), object_id); + if (!status) { + throw RayException("Put object error"); + } +} + +std::shared_ptr LocalModeObjectStore::GetRaw(const ObjectID &object_id, + int timeout_ms) { + std::vector object_ids; + object_ids.push_back(object_id); + auto buffers = GetRaw(object_ids, timeout_ms); + RAY_CHECK(buffers.size() == 1); + return buffers[0]; +} + +std::vector> LocalModeObjectStore::GetRaw( + const std::vector &ids, int timeout_ms) { + std::vector> results; + ::ray::Status status = + memory_store_->Get(ids, (int)ids.size(), timeout_ms, + *local_mode_ray_tuntime_.GetWorkerContext(), false, &results); + if (!status.ok()) { + throw RayException("Get object error: " + status.ToString()); + } + RAY_CHECK(results.size() == ids.size()); + std::vector> result_sbuffers; + result_sbuffers.reserve(results.size()); + for (size_t i = 0; i < results.size(); i++) { + auto data_buffer = results[i]->GetData(); + auto sbuffer = std::make_shared(data_buffer->Size()); + sbuffer->write(reinterpret_cast(data_buffer->Data()), + data_buffer->Size()); + result_sbuffers.push_back(sbuffer); + } + return result_sbuffers; +} + +WaitResult LocalModeObjectStore::Wait(const std::vector &ids, int num_objects, + int timeout_ms) { + absl::flat_hash_set memory_object_ids; + for (const auto &object_id : ids) { + memory_object_ids.insert(object_id); + } + absl::flat_hash_set ready; + ::ray::Status status = + memory_store_->Wait(memory_object_ids, num_objects, timeout_ms, + *local_mode_ray_tuntime_.GetWorkerContext(), &ready); + if (!status.ok()) { + throw RayException("Wait object error: " + status.ToString()); + } + std::vector ready_vector; + ready_vector.reserve(ready.size()); + std::vector unready_vector; + unready_vector.reserve(ids.size() - ready.size()); + for (size_t i = 0; i < ids.size(); i++) { + if (ready.find(ids[i]) != ready.end()) { + ready_vector.push_back(ids[i]); + } else { + unready_vector.push_back(ids[i]); + } + } + WaitResult result(std::move(ready_vector), std::move(unready_vector)); + return result; +} +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/object/local_mode_object_store.h b/cpp/src/ray/runtime/object/local_mode_object_store.h new file mode 100644 index 000000000..3d4f947f4 --- /dev/null +++ b/cpp/src/ray/runtime/object/local_mode_object_store.h @@ -0,0 +1,33 @@ + +#pragma once + +#include +#include "ray/core.h" + +#include "../local_mode_ray_runtime.h" +#include "object_store.h" + +namespace ray { +namespace api { + +class LocalModeObjectStore : public ObjectStore { + public: + LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_tuntime); + + WaitResult Wait(const std::vector &ids, int num_objects, int timeout_ms); + + private: + void PutRaw(const ObjectID &object_id, std::shared_ptr data); + + std::shared_ptr GetRaw(const ObjectID &object_id, int timeout_ms); + + std::vector> GetRaw(const std::vector &ids, + int timeout_ms); + + std::unique_ptr<::ray::CoreWorkerMemoryStore> memory_store_; + + LocalModeRayRuntime &local_mode_ray_tuntime_; +}; + +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/object/object_store.cc b/cpp/src/ray/runtime/object/object_store.cc new file mode 100644 index 000000000..e944d07a1 --- /dev/null +++ b/cpp/src/ray/runtime/object/object_store.cc @@ -0,0 +1,24 @@ + +#include "object_store.h" + +#include +#include + +namespace ray { +namespace api { + +void ObjectStore::Put(const ObjectID &object_id, std::shared_ptr data) { + PutRaw(object_id, data); +} + +std::shared_ptr ObjectStore::Get(const ObjectID &object_id, + int timeout_ms) { + return GetRaw(object_id, timeout_ms); +} + +std::vector> ObjectStore::Get( + const std::vector &ids, int timeout_ms) { + return GetRaw(ids, timeout_ms); +} +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/object/object_store.h b/cpp/src/ray/runtime/object/object_store.h new file mode 100644 index 000000000..a506a8987 --- /dev/null +++ b/cpp/src/ray/runtime/object/object_store.h @@ -0,0 +1,65 @@ + +#pragma once + +#include + +#include +#include + +namespace ray { +namespace api { + +class ObjectStore { + public: + /// The default timeout to get object. + static const int default_get_timeout_ms = 1000; + + virtual ~ObjectStore(){}; + + /// Store an object in the object store. + /// + /// \param[in] object_id The object which should be stored. + /// \param[in] data The Serialized object buffer which should be stored. + void Put(const ObjectID &object_id, std::shared_ptr data); + + /// Get a single object from the object store. + /// This method will be blocked until the object are ready or wait for timeout. + /// + /// \param[in] object_id The object id which should be got. + /// \param[in] timeout_ms The maximum wait time in milliseconds. + /// \return shared pointer of the result buffer. + std::shared_ptr Get(const ObjectID &object_id, + int timeout_ms = default_get_timeout_ms); + + /// Get a list of objects from the object store. + /// This method will be blocked until all the objects are ready or wait for timeout. + /// + /// \param[in] ids The object id array which should be got. + /// \param[in] timeout_ms The maximum wait time in milliseconds. + /// \return shared pointer array of the result buffer. + std::vector> Get( + const std::vector &ids, int timeout_ms = default_get_timeout_ms); + + /// Wait for a list of RayObjects to be locally available, + /// until specified number of objects are ready, or specified timeout has passed. + /// + /// \param[in] ids The object id array which should be waited. + /// \param[in] num_objects The minimum number of objects to wait. + /// \param[in] timeout_ms The maximum wait time in milliseconds. + /// \return WaitResult Two arrays, one containing locally available objects, one + /// containing the rest. + virtual WaitResult Wait(const std::vector &ids, int num_objects, + int timeout_ms) = 0; + + private: + virtual void PutRaw(const ObjectID &object_id, + std::shared_ptr data) = 0; + + virtual std::shared_ptr GetRaw(const ObjectID &object_id, + int timeout_ms) = 0; + + virtual std::vector> GetRaw( + const std::vector &ids, int timeout_ms) = 0; +}; +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/task/invocation_spec.h b/cpp/src/ray/runtime/task/invocation_spec.h new file mode 100644 index 000000000..9aa80a33b --- /dev/null +++ b/cpp/src/ray/runtime/task/invocation_spec.h @@ -0,0 +1,22 @@ + +#pragma once + +#include +#include "ray/core.h" + +namespace ray { +namespace api { + +class InvocationSpec { + public: + TaskID task_id; + ActorID actor_id; + int actor_counter; + /// Remote function offset from base address. + size_t func_offset; + /// Executable function offset from base address. + size_t exec_func_offset; + std::shared_ptr args; +}; +} // namespace api +} // namespace ray diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc new file mode 100644 index 000000000..eb5926b49 --- /dev/null +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -0,0 +1,110 @@ + +#include +#include + +#include +#include "../../util/address_helper.h" +#include "../abstract_ray_runtime.h" +#include "local_mode_task_submitter.h" + +namespace ray { +namespace api { + +LocalModeTaskSubmitter::LocalModeTaskSubmitter( + LocalModeRayRuntime &local_mode_ray_tuntime) + : local_mode_ray_tuntime_(local_mode_ray_tuntime) { + thread_pool_.reset(new boost::asio::thread_pool(10)); +} + +ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskType type) { + /// TODO(Guyang Song): Make the infomation of TaskSpecification more reasonable + /// We just reuse the TaskSpecification class and make the single process mode work. + /// Maybe some infomation of TaskSpecification are not reasonable or invalid. + /// We will enhance this after implement the cluster mode. + auto functionDescriptor = FunctionDescriptorBuilder::BuildCpp( + "SingleProcess", std::to_string(invocation.func_offset), + std::to_string(invocation.exec_func_offset)); + rpc::Address address; + std::unordered_map required_resources; + std::unordered_map required_placement_resources; + TaskSpecBuilder builder; + builder.SetCommonTaskSpec(invocation.task_id, rpc::Language::CPP, functionDescriptor, + local_mode_ray_tuntime_.GetCurrentJobID(), + local_mode_ray_tuntime_.GetCurrentTaskId(), 0, + local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1, + required_resources, required_placement_resources); + if (type == TaskType::NORMAL_TASK) { + } else if (type == TaskType::ACTOR_CREATION_TASK) { + builder.SetActorCreationTaskSpec(invocation.actor_id); + } else if (type == TaskType::ACTOR_TASK) { + const TaskID actor_creation_task_id = + TaskID::ForActorCreationTask(invocation.actor_id); + const ObjectID actor_creation_dummy_object_id = ObjectID::ForTaskReturn( + actor_creation_task_id, 1, static_cast(ray::TaskTransportType::RAYLET)); + builder.SetActorTaskSpec(invocation.actor_id, actor_creation_dummy_object_id, + ObjectID(), invocation.actor_counter); + } else { + throw RayException("unknown task type"); + } + auto buffer = std::make_shared<::ray::LocalMemoryBuffer>( + reinterpret_cast(invocation.args->data()), invocation.args->size(), + true); + /// TODO(Guyang Song): Use both 'AddByRefArg' and 'AddByValueArg' to distinguish + builder.AddByValueArg(::ray::RayObject(buffer, nullptr, std::vector())); + auto task_specification = builder.Build(); + ObjectID return_object_id = + task_specification.ReturnId(0, ray::TaskTransportType::RAYLET); + + std::shared_ptr actor; + std::shared_ptr mutex; + if (type == TaskType::ACTOR_TASK) { + absl::MutexLock lock(&actor_contexts_mutex_); + actor = actor_contexts_.at(invocation.actor_id).get()->current_actor; + mutex = actor_contexts_.at(invocation.actor_id).get()->actor_mutex; + } + AbstractRayRuntime *runtime = &local_mode_ray_tuntime_; + if (type == TaskType::ACTOR_CREATION_TASK || type == TaskType::ACTOR_TASK) { + /// TODO(Guyang Song): Handle task dependencies. + /// Execute actor task directly in the main thread because we must guarantee the actor + /// task executed by calling order. + TaskExecutor::Invoke(task_specification, actor, runtime); + } else { + boost::asio::post(*thread_pool_.get(), + std::bind( + [actor, mutex, runtime](TaskSpecification &ts) { + if (mutex) { + absl::MutexLock lock(mutex.get()); + } + TaskExecutor::Invoke(ts, actor, runtime); + }, + std::move(task_specification))); + } + return return_object_id; +} + +ObjectID LocalModeTaskSubmitter::SubmitTask(const InvocationSpec &invocation) { + return Submit(invocation, TaskType::NORMAL_TASK); +} + +ActorID LocalModeTaskSubmitter::CreateActor(RemoteFunctionPtrHolder &fptr, + std::shared_ptr args) { + ActorID id = local_mode_ray_tuntime_.GetNextActorID(); + typedef std::shared_ptr (*ExecFunction)( + uintptr_t base_addr, size_t func_offset, std::shared_ptr args); + ExecFunction exec_function = (ExecFunction)(fptr.exec_function_pointer); + auto data = + (*exec_function)(dynamic_library_base_addr, + (size_t)(fptr.function_pointer - dynamic_library_base_addr), args); + std::unique_ptr actorContext(new ActorContext()); + actorContext->current_actor = data; + absl::MutexLock lock(&actor_contexts_mutex_); + actor_contexts_.emplace(id, std::move(actorContext)); + return id; +} + +ObjectID LocalModeTaskSubmitter::SubmitActorTask(const InvocationSpec &invocation) { + return Submit(invocation, TaskType::ACTOR_TASK); +} + +} // namespace api +} // namespace ray diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.h b/cpp/src/ray/runtime/task/local_mode_task_submitter.h new file mode 100644 index 000000000..858d2d989 --- /dev/null +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include +#include "../local_mode_ray_runtime.h" +#include "absl/synchronization/mutex.h" +#include "invocation_spec.h" +#include "ray/core.h" +#include "task_executor.h" +#include "task_submitter.h" + +namespace ray { +namespace api { + +class LocalModeTaskSubmitter : public TaskSubmitter { + public: + LocalModeTaskSubmitter(LocalModeRayRuntime &local_mode_ray_tuntime); + + ObjectID SubmitTask(const InvocationSpec &invocation); + + ActorID CreateActor(RemoteFunctionPtrHolder &fptr, + std::shared_ptr args); + + ObjectID SubmitActorTask(const InvocationSpec &invocation); + + private: + std::unordered_map> actor_contexts_; + + absl::Mutex actor_contexts_mutex_; + + std::unique_ptr thread_pool_; + + LocalModeRayRuntime &local_mode_ray_tuntime_; + + ObjectID Submit(const InvocationSpec &invocation, TaskType type); +}; +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/task/task_executor.cc b/cpp/src/ray/runtime/task/task_executor.cc new file mode 100644 index 000000000..d5eb9e16f --- /dev/null +++ b/cpp/src/ray/runtime/task/task_executor.cc @@ -0,0 +1,46 @@ + +#include + +#include "../../util/address_helper.h" +#include "../abstract_ray_runtime.h" +#include "task_executor.h" + +namespace ray { +namespace api { + +// TODO(Guyang Song): Make a common task execution function used for both local mode and +// cluster mode. +std::unique_ptr TaskExecutor::Execute(const InvocationSpec &invocation) { + return std::unique_ptr(new ObjectID()); +}; + +void TaskExecutor::Invoke(const TaskSpecification &task_spec, + std::shared_ptr actor, + AbstractRayRuntime *runtime) { + auto args = std::make_shared(task_spec.ArgDataSize(0)); + /// TODO(Guyang Song): Avoid the memory copy. + args->write(reinterpret_cast(task_spec.ArgData(0)), + task_spec.ArgDataSize(0)); + auto function_descriptor = task_spec.FunctionDescriptor(); + auto typed_descriptor = function_descriptor->As(); + std::shared_ptr data; + if (actor) { + typedef std::shared_ptr (*ExecFunction)( + uintptr_t base_addr, size_t func_offset, std::shared_ptr args, + std::shared_ptr object); + ExecFunction exec_function = (ExecFunction)( + dynamic_library_base_addr + std::stoul(typed_descriptor->ExecFunctionOffset())); + data = (*exec_function)(dynamic_library_base_addr, + std::stoul(typed_descriptor->FunctionOffset()), args, actor); + } else { + typedef std::shared_ptr (*ExecFunction)( + uintptr_t base_addr, size_t func_offset, std::shared_ptr args); + ExecFunction exec_function = (ExecFunction)( + dynamic_library_base_addr + std::stoul(typed_descriptor->ExecFunctionOffset())); + data = (*exec_function)(dynamic_library_base_addr, + std::stoul(typed_descriptor->FunctionOffset()), args); + } + runtime->Put(std::move(data), task_spec.ReturnId(0, ray::TaskTransportType::RAYLET)); +} +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/task/task_executor.h b/cpp/src/ray/runtime/task/task_executor.h new file mode 100644 index 000000000..f68b5b51a --- /dev/null +++ b/cpp/src/ray/runtime/task/task_executor.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include "absl/synchronization/mutex.h" +#include "invocation_spec.h" +#include "ray/core.h" + +namespace ray { +namespace api { + +class AbstractRayRuntime; + +class ActorContext { + public: + std::shared_ptr current_actor = nullptr; + + std::shared_ptr actor_mutex; + + ActorContext() { actor_mutex = std::shared_ptr(new absl::Mutex); } +}; + +class TaskExecutor { + public: + /// TODO(Guyang Song): support multiple tasks execution + std::unique_ptr Execute(const InvocationSpec &invocation); + + static void Invoke(const TaskSpecification &task_spec, + std::shared_ptr actor, + AbstractRayRuntime *runtime); + + virtual ~TaskExecutor(){}; +}; +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/task/task_submitter.h b/cpp/src/ray/runtime/task/task_submitter.h new file mode 100644 index 000000000..6352608a2 --- /dev/null +++ b/cpp/src/ray/runtime/task/task_submitter.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +#include +#include "invocation_spec.h" + +namespace ray { +namespace api { + +class TaskSubmitter { + public: + TaskSubmitter(){}; + + virtual ~TaskSubmitter(){}; + + virtual ObjectID SubmitTask(const InvocationSpec &invocation) = 0; + + virtual ActorID CreateActor(RemoteFunctionPtrHolder &fptr, + std::shared_ptr args) = 0; + + virtual ObjectID SubmitActorTask(const InvocationSpec &invocation) = 0; +}; +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/test/api_test.cc b/cpp/src/ray/test/api_test.cc new file mode 100644 index 000000000..c24a7c2d9 --- /dev/null +++ b/cpp/src/ray/test/api_test.cc @@ -0,0 +1,134 @@ + +#include +#include +#include +#include + +using namespace ray::api; + +int Return1() { return 1; } +int Plus1(int x) { return x + 1; } + +int Plus(int x, int y) { return x + y; } + +class Counter { + public: + int count; + + MSGPACK_DEFINE(count); + + Counter() { count = 0; } + + static Counter *FactoryCreate() { + Counter *counter = new Counter(); + return counter; + } + + int Plus1(int x) { return x + 1; } + + int Plus(int x, int y) { return x + y; } + + int Add(int x) { + count += x; + return count; + } +}; + +TEST(RayApiTest, PutTest) { + Ray::Init(); + + auto obj1 = Ray::Put(1); + auto i1 = obj1.Get(); + EXPECT_EQ(1, *i1); +} + +TEST(RayApiTest, WaitTest) { + Ray::Init(); + auto r0 = Ray::Call(Return1); + auto r1 = Ray::Call(Plus1, 3); + auto r2 = Ray::Call(Plus, 2, 3); + std::vector objects = {r0.ID(), r1.ID(), r2.ID()}; + WaitResult result = Ray::Wait(objects, 3, 1000); + EXPECT_EQ(result.ready.size(), 3); + EXPECT_EQ(result.unready.size(), 0); + std::vector> getResult = Ray::Get(objects); + EXPECT_EQ(getResult.size(), 3); + EXPECT_EQ(*getResult[0], 1); + EXPECT_EQ(*getResult[1], 4); + EXPECT_EQ(*getResult[2], 5); +} + +TEST(RayApiTest, CallWithValueTest) { + auto r0 = Ray::Call(Return1); + auto r1 = Ray::Call(Plus1, 3); + auto r2 = Ray::Call(Plus, 2, 3); + + int result0 = *(r0.Get()); + int result1 = *(r1.Get()); + int result2 = *(r2.Get()); + + EXPECT_EQ(result0, 1); + EXPECT_EQ(result1, 4); + EXPECT_EQ(result2, 5); +} + +TEST(RayApiTest, CallWithObjectTest) { + auto rt0 = Ray::Call(Return1); + auto rt1 = Ray::Call(Plus1, rt0); + auto rt2 = Ray::Call(Plus, rt1, 3); + auto rt3 = Ray::Call(Plus1, 3); + auto rt4 = Ray::Call(Plus, rt2, rt3); + + int return0 = *(rt0.Get()); + int return1 = *(rt1.Get()); + int return2 = *(rt2.Get()); + int return3 = *(rt3.Get()); + int return4 = *(rt4.Get()); + + EXPECT_EQ(return0, 1); + EXPECT_EQ(return1, 2); + EXPECT_EQ(return2, 5); + EXPECT_EQ(return3, 4); + EXPECT_EQ(return4, 9); +} + +TEST(RayApiTest, ActorTest) { + Ray::Init(); + RayActor actor = Ray::CreateActor(Counter::FactoryCreate); + auto rt1 = actor.Call(&Counter::Add, 1); + auto rt2 = actor.Call(&Counter::Add, 2); + auto rt3 = actor.Call(&Counter::Add, 3); + auto rt4 = actor.Call(&Counter::Add, rt3); + + int return1 = *(rt1.Get()); + int return2 = *(rt2.Get()); + int return3 = *(rt3.Get()); + int return4 = *(rt4.Get()); + + EXPECT_EQ(return1, 1); + EXPECT_EQ(return2, 3); + EXPECT_EQ(return3, 6); + EXPECT_EQ(return4, 12); +} + +TEST(RayApiTest, CompareWithFuture) { + // future from a packaged_task + std::packaged_task task(Plus1); + std::future f1 = task.get_future(); + std::thread t(std::move(task), 1); + int rt1 = f1.get(); + + // future from an async() + std::future f2 = std::async(std::launch::async, Plus1, 1); + int rt2 = f2.get(); + + // Ray API + Ray::Init(); + auto f3 = Ray::Call(Plus1, 1); + int rt3 = *f3.Get(); + + EXPECT_EQ(rt1, 2); + EXPECT_EQ(rt2, 2); + EXPECT_EQ(rt3, 2); + t.join(); +} diff --git a/cpp/src/ray/test/serialization_test.cc b/cpp/src/ray/test/serialization_test.cc new file mode 100644 index 000000000..0b79c8094 --- /dev/null +++ b/cpp/src/ray/test/serialization_test.cc @@ -0,0 +1,43 @@ + +#include +#include + +using namespace ray::api; + +TEST(SerializationTest, TypeHybridTest) { + uint32_t in_arg1 = 123456789, out_arg1; + std::string in_arg2 = "123567ABC", out_arg2; + + // 1 arg + // marshall + msgpack::sbuffer buffer1; + msgpack::packer pk1(&buffer1); + Serializer::Serialize(pk1, in_arg1); + // unmarshall + msgpack::unpacker upk1; + upk1.reserve_buffer(buffer1.size()); + memcpy(upk1.buffer(), buffer1.data(), buffer1.size()); + upk1.buffer_consumed(buffer1.size()); + + Serializer::Deserialize(upk1, &out_arg1); + + EXPECT_EQ(in_arg1, out_arg1); + + // 2 args + // marshall + msgpack::sbuffer buffer2; + msgpack::packer pk2(&buffer2); + Serializer::Serialize(pk2, in_arg1); + Serializer::Serialize(pk2, in_arg2); + + // unmarshall + msgpack::unpacker upk2; + upk2.reserve_buffer(buffer2.size()); + memcpy(upk2.buffer(), buffer2.data(), buffer2.size()); + upk2.buffer_consumed(buffer2.size()); + Serializer::Deserialize(upk2, &out_arg1); + Serializer::Deserialize(upk2, &out_arg2); + + EXPECT_EQ(in_arg1, out_arg1); + EXPECT_EQ(in_arg2, out_arg2); +} \ No newline at end of file diff --git a/cpp/src/ray/test/slow_function_test.cc b/cpp/src/ray/test/slow_function_test.cc new file mode 100644 index 000000000..4e4b8ab62 --- /dev/null +++ b/cpp/src/ray/test/slow_function_test.cc @@ -0,0 +1,36 @@ + +#include +#include +#include +#include + +using namespace ray::api; + +int slow_function(int i) { + std::this_thread::sleep_for(std::chrono::seconds(i)); + return i; +} + +TEST(RaySlowFunctionTest, BaseTest) { + Ray::Init(); + auto time1 = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()); + auto r0 = Ray::Call(slow_function, 1); + auto r1 = Ray::Call(slow_function, 2); + auto r2 = Ray::Call(slow_function, 3); + auto r3 = Ray::Call(slow_function, 4); + + int result0 = *(r0.Get()); + int result1 = *(r1.Get()); + int result2 = *(r2.Get()); + int result3 = *(r3.Get()); + auto time2 = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()); + + EXPECT_EQ(result0, 1); + EXPECT_EQ(result1, 2); + EXPECT_EQ(result2, 3); + EXPECT_EQ(result3, 4); + + EXPECT_LT(time2.count() - time1.count(), 4200); +} diff --git a/cpp/src/ray/util/address_helper.cc b/cpp/src/ray/util/address_helper.cc new file mode 100644 index 000000000..92560bebe --- /dev/null +++ b/cpp/src/ray/util/address_helper.cc @@ -0,0 +1,16 @@ +#include +#include + +namespace ray { +namespace api { + +uintptr_t dynamic_library_base_addr; + +extern "C" void GenerateBaseAddressOfCurrentLibrary() { + Dl_info info; + dladdr((void *)GenerateBaseAddressOfCurrentLibrary, &info); + dynamic_library_base_addr = (uintptr_t)info.dli_fbase; + return; +} +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/util/address_helper.h b/cpp/src/ray/util/address_helper.h new file mode 100644 index 000000000..7d41cdf10 --- /dev/null +++ b/cpp/src/ray/util/address_helper.h @@ -0,0 +1,14 @@ +#pragma once +#include +#include + +namespace ray { +namespace api { + +/// A base address which is used to calculate function offset +extern uintptr_t dynamic_library_base_addr; + +/// A fixed C language function which help to get infomation from dladdr +extern "C" void GenerateBaseAddressOfCurrentLibrary(); +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/src/ray/common/function_descriptor.cc b/src/ray/common/function_descriptor.cc index 498ce756e..eeec9279b 100644 --- a/src/ray/common/function_descriptor.cc +++ b/src/ray/common/function_descriptor.cc @@ -44,12 +44,25 @@ FunctionDescriptor FunctionDescriptorBuilder::BuildPython( return ray::FunctionDescriptor(new PythonFunctionDescriptor(std::move(descriptor))); } +FunctionDescriptor FunctionDescriptorBuilder::BuildCpp( + const std::string &lib_name, const std::string &function_offset, + const std::string &exec_function_offset) { + rpc::FunctionDescriptor descriptor; + auto typed_descriptor = descriptor.mutable_cpp_function_descriptor(); + typed_descriptor->set_lib_name(lib_name); + typed_descriptor->set_function_offset(function_offset); + typed_descriptor->set_exec_function_offset(exec_function_offset); + return ray::FunctionDescriptor(new CppFunctionDescriptor(std::move(descriptor))); +} + FunctionDescriptor FunctionDescriptorBuilder::FromProto(rpc::FunctionDescriptor message) { switch (message.function_descriptor_case()) { case ray::FunctionDescriptorType::kJavaFunctionDescriptor: return ray::FunctionDescriptor(new ray::JavaFunctionDescriptor(std::move(message))); case ray::FunctionDescriptorType::kPythonFunctionDescriptor: return ray::FunctionDescriptor(new ray::PythonFunctionDescriptor(std::move(message))); + case ray::FunctionDescriptorType::kCppFunctionDescriptor: + return ray::FunctionDescriptor(new ray::CppFunctionDescriptor(std::move(message))); default: break; } @@ -75,6 +88,13 @@ FunctionDescriptor FunctionDescriptorBuilder::FromVector( function_descriptor_list[2], // function name function_descriptor_list[3] // function hash ); + } else if (language == rpc::Language::CPP) { + RAY_CHECK(function_descriptor_list.size() == 3); + return FunctionDescriptorBuilder::BuildCpp( + function_descriptor_list[0], // lib name + function_descriptor_list[1], // function offset + function_descriptor_list[2] // exec function offset + ); } else { RAY_LOG(FATAL) << "Unspported language " << language; return FunctionDescriptorBuilder::Empty(); diff --git a/src/ray/common/function_descriptor.h b/src/ray/common/function_descriptor.h index 3b21760c7..6c528bf2e 100644 --- a/src/ray/common/function_descriptor.h +++ b/src/ray/common/function_descriptor.h @@ -153,6 +153,44 @@ class PythonFunctionDescriptor : public FunctionDescriptorInterface { const rpc::PythonFunctionDescriptor *typed_message_; }; +class CppFunctionDescriptor : public FunctionDescriptorInterface { + public: + /// Construct from a protobuf message object. + /// The input message will be **copied** into this object. + /// + /// \param message The protobuf message. + explicit CppFunctionDescriptor(rpc::FunctionDescriptor message) + : FunctionDescriptorInterface(std::move(message)) { + RAY_CHECK(message_->function_descriptor_case() == + ray::FunctionDescriptorType::kCppFunctionDescriptor); + typed_message_ = &(message_->cpp_function_descriptor()); + } + + virtual size_t Hash() const { + return std::hash()(ray::FunctionDescriptorType::kCppFunctionDescriptor) ^ + std::hash()(typed_message_->lib_name()) ^ + std::hash()(typed_message_->function_offset()) ^ + std::hash()(typed_message_->exec_function_offset()); + } + + virtual std::string ToString() const { + return "{type=CppFunctionDescriptor, lib_name=" + typed_message_->lib_name() + + ", function_offset=" + typed_message_->function_offset() + + ", exec_function_offset=" + typed_message_->exec_function_offset() + "}"; + } + + std::string LibName() const { return typed_message_->lib_name(); } + + std::string FunctionOffset() const { return typed_message_->function_offset(); } + + std::string ExecFunctionOffset() const { + return typed_message_->exec_function_offset(); + } + + private: + const rpc::CppFunctionDescriptor *typed_message_; +}; + typedef std::shared_ptr FunctionDescriptor; inline bool operator==(const FunctionDescriptor &left, const FunctionDescriptor &right) { @@ -190,6 +228,13 @@ class FunctionDescriptorBuilder { const std::string &function_name, const std::string &function_hash); + /// Build a CppFunctionDescriptor. + /// + /// \return a ray::CppFunctionDescriptor + static FunctionDescriptor BuildCpp(const std::string &lib_name, + const std::string &function_offset, + const std::string &exec_function_offset); + /// Build a ray::FunctionDescriptor according to input message. /// /// \return new ray::FunctionDescriptor diff --git a/src/ray/common/id.h b/src/ray/common/id.h index a269d8872..8555b2000 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -106,6 +107,8 @@ class UniqueID : public BaseID { UniqueID() : BaseID() {} + MSGPACK_DEFINE(id_); + protected: UniqueID(const std::string &binary); @@ -126,6 +129,8 @@ class JobID : public BaseID { JobID() : BaseID() {} + MSGPACK_DEFINE(id_); + private: uint8_t id_[kLength]; }; @@ -171,6 +176,8 @@ class ActorID : public BaseID { /// \return The job id to which this actor belongs. JobID JobId() const; + MSGPACK_DEFINE(id_); + private: uint8_t id_[kLength]; }; @@ -238,6 +245,8 @@ class TaskID : public BaseID { /// \return The `JobID` of the job which creates this task. JobID JobId() const; + MSGPACK_DEFINE(id_); + private: uint8_t id_[kLength]; }; @@ -371,6 +380,8 @@ class ObjectID : public BaseID { /// \return The computed object ID. static ObjectID ForActorHandle(const ActorID &actor_id); + MSGPACK_DEFINE(id_); + private: /// A helper method to generate an ObjectID. static ObjectID GenerateObjectId(const std::string &task_id_binary, diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 22ae0db7d..77da54370 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -67,11 +67,22 @@ message PythonFunctionDescriptor { string function_hash = 4; } +/// Function descriptor for C/C++. +message CppFunctionDescriptor { + /// Dynamic library name which contains the remote function. + string lib_name = 1; + /// Remote function offset from base address. + string function_offset = 2; + /// Executable function offset from base address. + string exec_function_offset = 3; +} + // A union wrapper for various function descriptor types. message FunctionDescriptor { oneof function_descriptor { JavaFunctionDescriptor java_function_descriptor = 1; PythonFunctionDescriptor python_function_descriptor = 2; + CppFunctionDescriptor cpp_function_descriptor = 3; } }