diff --git a/ci/travis/bazel-format.sh b/ci/travis/bazel-format.sh index 3910529a4..a97b97e6f 100755 --- a/ci/travis/bazel-format.sh +++ b/ci/travis/bazel-format.sh @@ -45,6 +45,6 @@ done pushd "$ROOT_DIR"/../.. BAZEL_FILES=(bazel/BUILD bazel/ray.bzl BUILD.bazel java/BUILD.bazel \ - cpp/BUILD.bazel streaming/BUILD.bazel streaming/java/BUILD.bazel WORKSPACE) + cpp/BUILD.bazel cpp/example/BUILD.bazel streaming/BUILD.bazel streaming/java/BUILD.bazel WORKSPACE) buildifier -mode=$RUN_TYPE -diff_command="diff -u" "${BAZEL_FILES[@]}" popd diff --git a/ci/travis/ci.sh b/ci/travis/ci.sh index e72380bdb..6267a2321 100755 --- a/ci/travis/ci.sh +++ b/ci/travis/ci.sh @@ -188,6 +188,9 @@ test_cpp() { bazel build --config=ci //cpp:all # shellcheck disable=SC2046 bazel test --config=ci $(./scripts/bazel_export_options) //cpp:all --build_tests_only + # run the cpp example + bazel run //cpp/example:example + } test_wheels() { diff --git a/cpp/BUILD.bazel b/cpp/BUILD.bazel index af82486a0..a4dc5b505 100644 --- a/cpp/BUILD.bazel +++ b/cpp/BUILD.bazel @@ -21,7 +21,6 @@ cc_library( "src/ray/util/*.h", "src/ray/*.cc", "src/ray/*.h", - "src/ray/worker/default_worker.cc", ]), hdrs = glob([ "include/ray/*.h", @@ -45,18 +44,36 @@ cc_library( ) cc_binary( - name = "example", - testonly = 1, + name = "default_worker", srcs = glob([ - "src/example/example.cc", + "src/ray/worker/default_worker.cc", ]), copts = COPTS, - linkstatic = False, + linkstatic = True, deps = [ "ray_api", ], ) +genrule( + name = "ray_cpp_pkg", + srcs = [ + "default_worker", + "ray_api", + ], + outs = ["ray_cpp_pkg.out"], + cmd = """ + WORK_DIR="$$(pwd)" && + mkdir -p "$$WORK_DIR/python/ray/core/src/ray/cpp/" && + cp -f $(location default_worker) "$$WORK_DIR/python/ray/core/src/ray/cpp/default_worker" && + cp -f $(locations ray_api) "$$WORK_DIR/python/ray/core/src/ray/cpp/" && + echo "$$WORK_DIR" > $@ + """, + local = 1, + visibility = ["//visibility:public"], +) + +# test cc_test( name = "api_test", srcs = glob([ @@ -76,27 +93,32 @@ cc_test( srcs = glob([ "src/ray/test/cluster/*.cc", ]), + args = [ + "$(location cluster_mode_test.so)", + ], copts = COPTS, + data = [ + "cluster_mode_test.so", + "ray_cpp_pkg", + ], linkstatic = True, deps = [ "ray_api", + "@com_github_gflags_gflags//:gflags", "@com_google_googletest//:gtest_main", ], ) -genrule( - name = "ray_cpp_pkg", - srcs = [ - "cluster_mode_test", +cc_binary( + name = "cluster_mode_test.so", + srcs = glob([ + "src/ray/test/cluster/*.cc", + ]), + copts = COPTS, + linkstatic = True, + deps = [ "ray_api", + "@com_github_gflags_gflags//:gflags", + "@com_google_googletest//:gtest_main", ], - outs = ["ray_cpp_pkg.out"], - cmd = """ - WORK_DIR="$$(pwd)" && - mkdir -p "$$WORK_DIR/python/ray/core/src/ray/cpp/" && - cp -f $(location cluster_mode_test) "$$WORK_DIR/python/ray/core/src/ray/cpp/default_worker" && - cp -f $(locations ray_api) "$$WORK_DIR/python/ray/core/src/ray/cpp/" && - echo "$$WORK_DIR" > $@ - """, - local = 1, ) diff --git a/cpp/dev_BUILD.bazel b/cpp/dev_BUILD.bazel deleted file mode 100644 index 8c7470b99..000000000 --- a/cpp/dev_BUILD.bazel +++ /dev/null @@ -1,74 +0,0 @@ -# Bazel development build for C++ API. -# C/C++ documentation: https://docs.bazel.build/versions/master/be/c-cpp.html - -load("//bazel:ray.bzl", "COPTS") - -cc_library( - name = "ray_api", - srcs = glob([ - "src/ray/api.cc", - "src/ray/api/*.cc", - "src/ray/api/*.h", - "src/ray/app/*.cc", - "src/ray/app/*.h", - "src/ray/runtime/*.cc", - "src/ray/runtime/*.h", - "src/ray/runtime/**/*.cc", - "src/ray/runtime/**/*.h", - "src/ray/runtime/task/*.cc", - "src/ray/runtime/task/*.h", - "src/ray/util/*.cc", - "src/ray/util/*.h", - "src/ray/*.cc", - "src/ray/*.h", - "src/ray/worker/default_worker.cc", - ]), - hdrs = glob([ - "include/ray/*.h", - "include/ray/**/*.h", - "include/ray/**/**/*.h", - ]), - copts = COPTS, - linkopts = ["-ldl"], - linkstatic = True, - strip_include_prefix = "include", - visibility = ["//visibility:public"], - deps = [ - "//:core_worker_lib", - "//:ray_common", - "//:ray_util", - "@boost//:asio", - "@boost//:thread", - "@com_google_absl//absl/synchronization", - "@msgpack", - ], -) - -cc_binary( - name = "example", - srcs = glob([ - "src/ray/example/*.cc", - ]), - copts = COPTS, - linkstatic = True, - deps = [ - "ray_api", - ], -) - -genrule( - name = "ray_cpp_pkg", - srcs = [ - "example", - "ray_api", - ], - outs = ["ray_cpp_pkg.out"], - cmd = """ - WORK_DIR="$$(pwd)" && - mkdir -p "$$WORK_DIR/python/ray/core/src/ray/cpp/" && - cp -f $(location example) "$$WORK_DIR/python/ray/core/src/ray/cpp/default_worker" && - cp -f $(locations ray_api) "$$WORK_DIR/python/ray/core/src/ray/cpp/" && - echo "$$WORK_DIR" > $@ - """, - local = 1, -) diff --git a/cpp/example/BUILD.bazel b/cpp/example/BUILD.bazel new file mode 100644 index 000000000..a14212042 --- /dev/null +++ b/cpp/example/BUILD.bazel @@ -0,0 +1,37 @@ +# Bazel development build for C++ API. +# C/C++ documentation: https://docs.bazel.build/versions/master/be/c-cpp.html + +load("//bazel:ray.bzl", "COPTS") + +cc_binary( + name = "example", + srcs = glob([ + "*.cc", + ]), + args = [ + "--dynamic-library-path $(location example.so)", + ], + copts = COPTS, + data = [ + "example.so", + "//cpp:ray_cpp_pkg", + ], + linkstatic = True, + deps = [ + "//cpp:ray_api", + "@com_github_gflags_gflags//:gflags", + ], +) + +cc_binary( + name = "example.so", + srcs = glob([ + "*.cc", + ]), + copts = COPTS, + linkstatic = True, + deps = [ + "//cpp:ray_api", + "@com_github_gflags_gflags//:gflags", + ], +) diff --git a/cpp/src/ray/example/example.cc b/cpp/example/example.cc similarity index 81% rename from cpp/src/ray/example/example.cc rename to cpp/example/example.cc index 7ada6f1f5..13f82192d 100644 --- a/cpp/src/ray/example/example.cc +++ b/cpp/example/example.cc @@ -1,8 +1,12 @@ +/// This is a complete example of writing a distributed program using the C ++ worker API. + +/// including the header #include #include -#include +#include "gflags/gflags.h" +/// using namespace using namespace ::ray::api; /// general function of user code @@ -32,22 +36,25 @@ class Counter { } }; +DEFINE_string(redis_address, "", "The ip address of redis server."); + +DEFINE_string(dynamic_library_path, "", "The local path of the dynamic library."); + int main(int argc, char **argv) { - /// Currently, we compile `default_worker` and `example` in one single binary, - /// to work around a symbol conflicting issue. - /// This is the main function of the binary, and we use the `is_default_worker` arg to - /// tell if this binary is used as `default_worker` or `example`. - const char *default_worker_magic = "is_default_worker"; - /// `is_default_worker` is the last arg of `argv` - if (argc > 1 && - memcmp(argv[argc - 1], default_worker_magic, strlen(default_worker_magic)) == 0) { - default_worker_main(argc, argv); - return 0; + /// configuration + gflags::ParseCommandLineFlags(&argc, &argv, true); + const std::string dynamic_library_path = FLAGS_dynamic_library_path; + const std::string redis_address = FLAGS_redis_address; + gflags::ShutDownCommandLineFlags(); + RAY_CHECK(!dynamic_library_path.empty()) + << "Please add a local dynamic library by '--dynamic-library-path'"; + ray::api::RayConfig::GetInstance()->lib_name = dynamic_library_path; + if (!redis_address.empty()) { + ray::api::RayConfig::GetInstance()->SetRedisAddress(redis_address); } - /// initialization to cluster mode - ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER; - /// Dynamic library loading is not supported yet. - ray::api::RayConfig::GetInstance()->lib_name = ""; + ::ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER; + + /// initialization Ray::Init(); /// put and get object @@ -86,7 +93,6 @@ int main(int argc, char **argv) { /// general function remote call(args passed by value) auto r0 = Ray::Task(Return1).Remote(); auto r2 = Ray::Task(Plus, 3, 22).Remote(); - int task_result3 = *(Ray::Get(r2)); std::cout << "task_result3 = " << task_result3 << std::endl; @@ -95,7 +101,6 @@ int main(int argc, char **argv) { auto r4 = Ray::Task(Plus1, r3).Remote(); auto r5 = Ray::Task(Plus, r4, r3).Remote(); auto r6 = Ray::Task(Plus, r4, 10).Remote(); - int task_result4 = *(Ray::Get(r6)); int task_result5 = *(Ray::Get(r5)); std::cout << "task_result4 = " << task_result4 << ", task_result5 = " << task_result5 @@ -104,31 +109,30 @@ int main(int argc, char **argv) { /// create actor and actor function remote call with args passed by value ActorHandle actor4 = Ray::Actor(Counter::FactoryCreate, 10).Remote(); auto r10 = actor4.Task(&Counter::Add, 8).Remote(); - int actor_result4 = *(Ray::Get(r10)); std::cout << "actor_result4 = " << actor_result4 << std::endl; /// create actor and task function remote call with args passed by reference ActorHandle actor5 = Ray::Actor(Counter::FactoryCreate, r10, 0).Remote(); - auto r11 = actor5.Task(&Counter::Add, r0).Remote(); auto r12 = actor5.Task(&Counter::Add, r11).Remote(); auto r13 = actor5.Task(&Counter::Add, r10).Remote(); auto r14 = actor5.Task(&Counter::Add, r13).Remote(); auto r15 = Ray::Task(Plus, r0, r11).Remote(); auto r16 = Ray::Task(Plus1, r15).Remote(); - int result12 = *(Ray::Get(r12)); int result14 = *(Ray::Get(r14)); int result11 = *(Ray::Get(r11)); int result13 = *(Ray::Get(r13)); int result16 = *(Ray::Get(r16)); int result15 = *(Ray::Get(r15)); - std::cout << "Final result:" << std::endl; std::cout << "result11 = " << result11 << ", result12 = " << result12 << ", result13 = " << result13 << ", result14 = " << result14 << ", result15 = " << result15 << ", result16 = " << result16 << std::endl; + + /// shutdown Ray::Shutdown(); + return 0; } diff --git a/cpp/include/ray/api/ray_config.h b/cpp/include/ray/api/ray_config.h index b6bc55d5d..b8c4f0cd2 100644 --- a/cpp/include/ray/api/ray_config.h +++ b/cpp/include/ray/api/ray_config.h @@ -34,6 +34,13 @@ class RayConfig { static std::shared_ptr GetInstance(); + void SetRedisAddress(const std::string address) { + auto pos = address.find(':'); + RAY_CHECK(pos != std::string::npos); + redis_ip = address.substr(0, pos); + redis_port = std::stoi(address.substr(pos + 1, address.length())); + } + private: static std::shared_ptr config_; }; diff --git a/cpp/include/ray/experimental/default_worker.h b/cpp/include/ray/experimental/default_worker.h deleted file mode 100644 index 2c0e02259..000000000 --- a/cpp/include/ray/experimental/default_worker.h +++ /dev/null @@ -1,9 +0,0 @@ -#pragma once - -namespace ray { -namespace api { - -int default_worker_main(int argc, char **argv); - -} // namespace api -} // namespace ray diff --git a/cpp/src/example/example.cc b/cpp/src/example/example.cc deleted file mode 100644 index 1375136ca..000000000 --- a/cpp/src/example/example.cc +++ /dev/null @@ -1,76 +0,0 @@ - -/// This is a complete example of writing a distributed program using the C ++ worker API. - -/// including the header -#include - -/// using namespace -using namespace ::ray::api; - -/// general function of user code -int Return1() { return 1; } -int Plus1(int x) { return x + 1; } -int Plus(int x, int y) { return x + y; } - -/// a class of user code -class Counter { - public: - int count; - - Counter() { count = 0; } - - static Counter *FactoryCreate() { return new Counter(); } - /// non static function - int Add(int x) { - count += x; - return count; - } -}; - -int main() { - /// initialization - Ray::Init(); - - /// put and get object - auto obj = Ray::Put(123); - auto get_result = obj.Get(); - - /// general function remote call(args passed by value) - auto r0 = Ray::Task(Return1).Remote(); - auto r1 = Ray::Task(Plus1, 1).Remote(); - auto r2 = Ray::Task(Plus, 1, 2).Remote(); - - int result0 = *(r0.Get()); - int result1 = *(r1.Get()); - int result2 = *(r2.Get()); - - std::cout << "Ray::call with value results: " << result0 << " " << result1 << " " - << result2 << std::endl; - - /// general function remote call(args passed by reference) - auto r3 = Ray::Task(Return1).Remote(); - auto r4 = Ray::Task(Plus1, r3).Remote(); - auto r5 = Ray::Task(Plus, r4, 1).Remote(); - - int result3 = *(r3.Get()); - int result4 = *(r4.Get()); - int result5 = *(r5.Get()); - - std::cout << "Ray::call with reference results: " << result3 << " " << result4 << " " - << result5 << std::endl; - - /// create actor and actor function remote call - ActorHandle actor = Ray::Actor(Counter::FactoryCreate).Remote(); - auto r6 = actor.Task(&Counter::Add, 5).Remote(); - auto r7 = actor.Task(&Counter::Add, 1).Remote(); - auto r8 = actor.Task(&Counter::Add, 1).Remote(); - auto r9 = actor.Task(&Counter::Add, r8).Remote(); - - int result6 = *(r6.Get()); - int result7 = *(r7.Get()); - int result8 = *(r8.Get()); - int result9 = *(r9.Get()); - - std::cout << "Ray::call with actor results: " << result6 << " " << result7 << " " - << result8 << " " << result9 << std::endl; -} diff --git a/cpp/src/ray/runtime/task/task_executor.cc b/cpp/src/ray/runtime/task/task_executor.cc index f2b06af09..d0879112f 100644 --- a/cpp/src/ray/runtime/task/task_executor.cc +++ b/cpp/src/ray/runtime/task/task_executor.cc @@ -29,7 +29,7 @@ Status TaskExecutor::ExecuteTask( const std::vector &arg_reference_ids, const std::vector &return_ids, const std::string &debugger_breakpoint, std::vector> *results) { - RAY_LOG(INFO) << "TaskExecutor::ExecuteTask"; + RAY_LOG(INFO) << "Execute task: " << TaskType_Name(task_type); RAY_CHECK(ray_function.GetLanguage() == Language::CPP); auto function_descriptor = ray_function.GetFunctionDescriptor(); RAY_CHECK(function_descriptor->Type() == diff --git a/cpp/src/ray/test/cluster/cluster_mode_test.cc b/cpp/src/ray/test/cluster/cluster_mode_test.cc index 780fb0d30..e00c6af14 100644 --- a/cpp/src/ray/test/cluster/cluster_mode_test.cc +++ b/cpp/src/ray/test/cluster/cluster_mode_test.cc @@ -2,7 +2,6 @@ #include #include #include -#include using namespace ::ray::api; @@ -33,11 +32,16 @@ class Counter { } }; +std::string lib_name = ""; + +std::string redis_ip = ""; + TEST(RayClusterModeTest, FullTest) { /// initialization to cluster mode ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER; /// TODO(Guyang Song): add the dynamic library name - ray::api::RayConfig::GetInstance()->lib_name = ""; + ray::api::RayConfig::GetInstance()->lib_name = lib_name; + ray::api::RayConfig::GetInstance()->redis_ip = redis_ip; Ray::Init(); /// put and get object @@ -144,18 +148,11 @@ TEST(RayClusterModeTest, FullTest) { Ray::Shutdown(); } -/// TODO(Guyang Song): Separate default worker from this test. -/// Currently, we compile `default_worker` and `cluster_mode_test` in one single binary, -/// to work around a symbol conflicting issue. -/// This is the main function of the binary, and we use the `is_default_worker` arg to -/// tell if this binary is used as `default_worker` or `cluster_mode_test`. int main(int argc, char **argv) { - const char *default_worker_magic = "is_default_worker"; - /// `is_default_worker` is the last arg of `argv` - if (argc > 1 && - memcmp(argv[argc - 1], default_worker_magic, strlen(default_worker_magic)) == 0) { - default_worker_main(argc, argv); - return 0; + RAY_CHECK(argc == 2 || argc == 3); + lib_name = std::string(argv[1]); + if (argc == 3) { + redis_ip = std::string(argv[2]); } ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/cpp/src/ray/util/function_helper.cc b/cpp/src/ray/util/function_helper.cc index 5dfa8a012..8693ea6b1 100644 --- a/cpp/src/ray/util/function_helper.cc +++ b/cpp/src/ray/util/function_helper.cc @@ -14,19 +14,14 @@ uintptr_t base_addr = 0; static const uintptr_t BaseAddressForHandle(void *handle) { /// TODO(Guyang Song): Implement a cross-platform function. - /// Not Implemented. - return -1; + return (uintptr_t)((NULL == handle) ? NULL : (void *)*(size_t const *)(handle)); } uintptr_t FunctionHelper::LoadLibrary(std::string lib_name) { - if (dynamic_library_base_addr != 0) { - /// Base address has been generated. - return dynamic_library_base_addr; - } /// Generate base address from library. RAY_LOG(INFO) << "Start load library " << lib_name; - void *example = dlopen(lib_name.c_str(), RTLD_LAZY); - uintptr_t base_addr = BaseAddressForHandle(example); + void *handle = dlopen(lib_name.c_str(), RTLD_LAZY); + uintptr_t base_addr = BaseAddressForHandle(handle); RAY_CHECK(base_addr > 0); RAY_LOG(INFO) << "Loaded library " << lib_name << " to base address " << base_addr; loaded_library_.emplace(lib_name, base_addr); diff --git a/cpp/src/ray/worker/default_worker.cc b/cpp/src/ray/worker/default_worker.cc index 2ebfb8d6c..dd61bb457 100644 --- a/cpp/src/ray/worker/default_worker.cc +++ b/cpp/src/ray/worker/default_worker.cc @@ -3,14 +3,11 @@ #include #include -using namespace ::ray; - -namespace ray { -namespace api { +using namespace ::ray::api; int default_worker_main(int argc, char **argv) { RAY_LOG(INFO) << "CPP default worker started"; - RAY_CHECK(argc == 8); + RAY_CHECK(argc == 7); auto config = ray::api::RayConfig::GetInstance(); config->run_mode = RunMode::CLUSTER; @@ -19,10 +16,7 @@ int default_worker_main(int argc, char **argv) { config->raylet_socket = std::string(argv[2]); config->node_manager_port = std::stoi(std::string(argv[3])); std::string redis_address = std::string(std::string(argv[4])); - auto pos = redis_address.find(':'); - RAY_CHECK(pos != std::string::npos); - config->redis_ip = redis_address.substr(0, pos); - config->redis_port = std::stoi(redis_address.substr(pos + 1, redis_address.length())); + config->SetRedisAddress(redis_address); config->redis_password = std::string(std::string(argv[5])); config->session_dir = std::string(std::string(argv[6])); @@ -32,5 +26,7 @@ int default_worker_main(int argc, char **argv) { return 0; } -} // namespace api -} // namespace ray +int main(int argc, char **argv) { + default_worker_main(argc, argv); + return 0; +} diff --git a/doc/source/index.rst b/doc/source/index.rst index 9edb823b2..be01da3cf 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -117,14 +117,17 @@ Ray provides Python, Java, and *EXPERIMENTAL* C++ API. And Ray uses Tasks (funct | The C++ Ray API is currently experimental with limited support. You can track its development `here `__ and report issues on GitHub. | Run the following commands to get started: | - Build ray from source with *bazel* as shown `here `__. - | - Run `"cd ray/cpp"`. - | - Run `"cp dev_BUILD.bazel BUILD.bazel"`. - | - Modify `src/ray/example/example.cc`. + | - Modify `cpp/example/example.cc`. + | - Run `"bazel build //cpp:example"`. + | Option 1:, run the example directly with a dynamic library path. It will start a Ray cluster automatically. | - Run `"ray stop"`. - | - Run `"bazel build //cpp:all"`. - | - Run `"bazel run //cpp:example"`. + | - Run `"./bazel-bin/cpp/example/example --dynamic-library-path=bazel-bin/cpp/example/example.so"` + | Option 2: connect to an existing Ray cluster with a known redis address (e.g. `127.0.0.1:6379`). + | - Run `"ray stop"`. + | - Run `"ray start --head --port 6379 --redis-password 5241590000000000 --node-manager-port 62665"`. + | - Run `"./bazel-bin/cpp/example/example --dynamic-library-path=bazel-bin/cpp/example/example.so --redis-address=127.0.0.1:6379"`. - .. literalinclude:: ../../cpp/src/ray/example/example.cc + .. literalinclude:: ../../cpp/example/example.cc :language: cpp You can also get started by visiting our `Tutorials `_. For the latest wheels (nightlies), see the `installation page `__. diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 1c4c6497d..996cede11 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1580,13 +1580,9 @@ def build_cpp_worker_command( The command string for starting CPP worker. """ - # TODO(Guyang Song): Remove the arg is_default_worker. - # See `cluster_mode_test.cc` for why this workaround is currently needed - # for C++ workers. command = [ DEFAULT_WORKER_EXECUTABLE, plasma_store_name, raylet_name, - str(node_manager_port), redis_address, redis_password, session_dir, - "is_default_worker" + str(node_manager_port), redis_address, redis_password, session_dir ] return command