From cb70864c04a16cbdc162c78f95da7d13cd1b38c6 Mon Sep 17 00:00:00 2001 From: SongGuyang Date: Fri, 28 Aug 2020 13:53:36 +0800 Subject: [PATCH] [cpp worker] support cluster mode and object Put/Get works (#9682) --- .bazelrc | 9 ++ .travis.yml | 2 +- ci/travis/ci.sh | 1 + cpp/BUILD.bazel | 92 +++++++++++++------ cpp/include/ray/api.h | 3 + cpp/include/ray/api/ray_config.h | 15 ++- cpp/include/ray/core.h | 4 + cpp/src/example/example.cc | 2 +- cpp/src/example/example_cluster_mode.cc | 27 ++++++ cpp/src/ray/api.cc | 2 + cpp/src/ray/runtime/abstract_ray_runtime.cc | 17 +++- cpp/src/ray/runtime/abstract_ray_runtime.h | 4 + cpp/src/ray/runtime/native_ray_runtime.cc | 2 - .../runtime/object/local_mode_object_store.cc | 9 +- .../runtime/object/local_mode_object_store.h | 4 +- .../ray/runtime/object/native_object_store.cc | 51 +++++++++- .../ray/runtime/object/native_object_store.h | 4 +- cpp/src/ray/runtime/object/object_store.cc | 8 +- cpp/src/ray/runtime/object/object_store.h | 16 +++- cpp/src/ray/test/cluster/cluster_mode_test.cc | 15 +++ cpp/src/ray/util/process_helper.cc | 90 +++++++++++++++++- cpp/src/ray/util/process_helper.h | 17 +++- cpp/src/ray/worker/default_worker.cc | 83 +++++++++++++++++ python/ray/services.py | 49 ++++++++++ src/ray/common/ray_config_def.h | 3 + src/ray/core_worker/common.cc | 2 + src/ray/raylet/main.cc | 13 ++- src/ray/raylet/worker_pool.cc | 4 + 28 files changed, 490 insertions(+), 58 deletions(-) create mode 100644 cpp/src/example/example_cluster_mode.cc create mode 100644 cpp/src/ray/test/cluster/cluster_mode_test.cc create mode 100644 cpp/src/ray/worker/default_worker.cc diff --git a/.bazelrc b/.bazelrc index 38dbaa83f..32af870a8 100644 --- a/.bazelrc +++ b/.bazelrc @@ -13,6 +13,15 @@ build:linux --force_pic build:macos --force_pic build:clang-cl --compiler=clang-cl build:msvc --compiler=msvc-cl +# `LC_ALL` and `LANG` is needed for cpp worker tests, because they will call "ray start". +# If we don't add them, python's `click` library will raise an error. +test --action_env=LC_ALL +test --action_env=LANG +# Allow C++ worker tests to execute "ray start" with the correct version of Python. +test --action_env=VIRTUAL_ENV +test --action_env=PYENV_VIRTUAL_ENV +test --action_env=PYENV_VERSION +test --action_env=PYENV_SHELL # This is needed for some core tests to run correctly test:windows --enable_runfiles # TODO(mehrdadn): Revert the "-\\.(asm|S)$" exclusion when this Bazel bug diff --git a/.travis.yml b/.travis.yml index e2cbeb416..b0d2a02ad 100644 --- a/.travis.yml +++ b/.travis.yml @@ -333,13 +333,13 @@ matrix: env: - TESTSUITE=cpp_worker - PYTHON=3.6 + - PYTHONWARNINGS=ignore install: - . ./ci/travis/ci.sh init before_script: - . ./ci/travis/ci.sh build script: - . ./ci/travis/ci.sh test_cpp - script: # raylet integration tests (core_worker_tests included in bazel tests below) - ./ci/suppress_output bash src/ray/test/run_object_manager_tests.sh diff --git a/ci/travis/ci.sh b/ci/travis/ci.sh index c2e37cd54..e9a95289b 100755 --- a/ci/travis/ci.sh +++ b/ci/travis/ci.sh @@ -174,6 +174,7 @@ test_python() { } test_cpp() { + bazel build --config=ci //cpp:all bazel test --config=ci //cpp:all --build_tests_only } diff --git a/cpp/BUILD.bazel b/cpp/BUILD.bazel index 5971b1d41..ed8dfd626 100644 --- a/cpp/BUILD.bazel +++ b/cpp/BUILD.bazel @@ -4,23 +4,7 @@ load("//bazel:ray.bzl", "COPTS") cc_library( - name = "libray_api_header", - hdrs = glob([ - "include/ray/*.h", - "include/ray/**/*.h", - "include/ray/**/**/*.h", - ]), - copts = COPTS, - strip_include_prefix = "include", - deps = [ - "//:core_worker_lib", - "//:ray_common", - "@msgpack", - ], -) - -cc_binary( - name = "libray_api.so", + name = "ray_api", srcs = glob([ "src/ray/api.cc", "src/ray/api/*.cc", @@ -38,13 +22,17 @@ cc_binary( "src/ray/*.cc", "src/ray/*.h", ]), + hdrs = glob([ + "include/ray/*.h", + "include/ray/**/*.h", + "include/ray/**/**/*.h", + ]), copts = COPTS, linkopts = ["-ldl"], - linkshared = 1, - linkstatic = False, + strip_include_prefix = "include", + # linkstatic = False, visibility = ["//visibility:public"], deps = [ - "libray_api_header", "//:core_worker_lib", "//:ray_common", "//:ray_util", @@ -55,20 +43,57 @@ cc_binary( ], ) -cc_import( - name = "ray_api", - shared_library = "libray_api.so", +cc_binary( + name = "default_worker", + srcs = [ + "src/ray/worker/default_worker.cc", + ], + copts = COPTS, + linkstatic = True, + deps = [ + "//:core_worker_lib", + ], +) + +genrule( + name = "ray_cpp_pkg", + srcs = [ + "default_worker", + "ray_api", + ], + outs = ["ray_cpp_pkg.out"], + cmd = """ + WORK_DIR="$$(pwd)" && + mkdir -p "$$WORK_DIR/python/ray/core/src/ray/cpp/" && + cp -f $(location default_worker) "$$WORK_DIR/python/ray/core/src/ray/cpp/" && + cp -f $(locations ray_api) "$$WORK_DIR/python/ray/core/src/ray/cpp/" && + echo "$$WORK_DIR" > $@ + """, + local = 1, ) cc_binary( name = "example", testonly = 1, - srcs = [ + srcs = glob([ "src/example/example.cc", - ], + ]), copts = COPTS, + linkstatic = False, + deps = [ + "ray_api", + ], +) + +cc_binary( + name = "example_cluster_mode", + testonly = 1, + srcs = glob([ + "src/example/example_cluster_mode.cc", + ]), + copts = COPTS, + linkstatic = False, deps = [ - "libray_api_header", "ray_api", ], ) @@ -79,8 +104,21 @@ cc_test( "src/ray/test/*.cc", ]), copts = COPTS, + linkstatic = False, + deps = [ + "ray_api", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "cluster_mode_test", + srcs = glob([ + "src/ray/test/cluster/*.cc", + ]), + copts = COPTS, + linkstatic = False, deps = [ - "libray_api_header", "ray_api", "@com_google_googletest//:gtest_main", ], diff --git a/cpp/include/ray/api.h b/cpp/include/ray/api.h index 24ba96f84..53f8026e3 100644 --- a/cpp/include/ray/api.h +++ b/cpp/include/ray/api.h @@ -33,6 +33,9 @@ class Ray { /// Initialize Ray runtime. static void Init(); + /// Shutdown Ray runtime. + static void Shutdown(); + /// Store an object in the object store. /// /// \param[in] obj The object which should be stored. diff --git a/cpp/include/ray/api/ray_config.h b/cpp/include/ray/api/ray_config.h index 268ae144c..f693e99fc 100644 --- a/cpp/include/ray/api/ray_config.h +++ b/cpp/include/ray/api/ray_config.h @@ -2,22 +2,27 @@ #pragma once #include #include +#include "ray/core.h" namespace ray { namespace api { -enum class RunMode { SINGLE_PROCESS, SINGLE_BOX, CLUSTER }; - -enum class WorkerMode { NONE, DRIVER, WORKER }; +enum class RunMode { SINGLE_PROCESS, CLUSTER }; /// TODO(Guyang Song): Make configuration complete and use to initialize. class RayConfig { public: - WorkerMode worker_mode = WorkerMode::DRIVER; + WorkerType worker_type = WorkerType::DRIVER; RunMode run_mode = RunMode::SINGLE_PROCESS; - std::string redis_address; + std::string redis_ip; + + int redis_port = 6379; + + std::string redis_password = "5241590000000000"; + + int node_manager_port = 62665; static std::shared_ptr GetInstance(); diff --git a/cpp/include/ray/core.h b/cpp/include/ray/core.h index 764983815..9ef405a19 100644 --- a/cpp/include/ray/core.h +++ b/cpp/include/ray/core.h @@ -8,6 +8,10 @@ #include "ray/common/task/task_common.h" #include "ray/common/task/task_spec.h" #include "ray/common/task/task_util.h" +#include "ray/common/test_util.h" +#include "ray/core_worker/common.h" #include "ray/core_worker/context.h" +#include "ray/core_worker/core_worker.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" +#include "ray/gcs/gcs_client.h" #include "ray/util/logging.h" diff --git a/cpp/src/example/example.cc b/cpp/src/example/example.cc index 28a067893..0f10c31e4 100644 --- a/cpp/src/example/example.cc +++ b/cpp/src/example/example.cc @@ -33,7 +33,7 @@ int main() { /// put and get object auto obj = Ray::Put(123); - auto getRsult = obj.Get(); + auto get_result = obj.Get(); /// general function remote call(args passed by value) auto r0 = Ray::Task(Return1).Remote(); diff --git a/cpp/src/example/example_cluster_mode.cc b/cpp/src/example/example_cluster_mode.cc new file mode 100644 index 000000000..ca178fc8b --- /dev/null +++ b/cpp/src/example/example_cluster_mode.cc @@ -0,0 +1,27 @@ + +/// This is a complete example of writing a distributed program using the C ++ worker API. + +/// including the header +#include +#include +#include + +/// using namespace +using namespace ray::api; + +int main(int argc, char **argv) { + RAY_LOG(INFO) << "Start cpp worker example"; + + /// initialization to cluster mode + ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER; + /// Set redis ip to connect an existing ray cluster. + /// ray::api::RayConfig::GetInstance()->redis_ip = "127.0.0.1"; + Ray::Init(); + + /// put and get object + auto obj = Ray::Put(123); + auto get_result = *(obj.Get()); + + RAY_LOG(INFO) << "Get result: " << get_result; + Ray::Shutdown(); +} diff --git a/cpp/src/ray/api.cc b/cpp/src/ray/api.cc index 9c84865b4..60eb75819 100644 --- a/cpp/src/ray/api.cc +++ b/cpp/src/ray/api.cc @@ -15,5 +15,7 @@ void Ray::Init() { [] { runtime_ = AbstractRayRuntime::DoInit(RayConfig::GetInstance()); }); } +void Ray::Shutdown() { AbstractRayRuntime::DoShutdown(RayConfig::GetInstance()); } + } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index 732cf4bd7..92125a580 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -20,22 +20,33 @@ AbstractRayRuntime *AbstractRayRuntime::DoInit(std::shared_ptr config GenerateBaseAddressOfCurrentLibrary(); runtime = new LocalModeRayRuntime(config); } else { - ProcessHelper::RayStart(); + ProcessHelper::getInstance().RayStart(config); runtime = new NativeRayRuntime(config); } RAY_CHECK(runtime); return runtime; } +void AbstractRayRuntime::DoShutdown(std::shared_ptr config) { + if (config->run_mode == RunMode::CLUSTER) { + ProcessHelper::getInstance().RayStop(config); + } +} + +void AbstractRayRuntime::Put(std::shared_ptr data, + ObjectID *object_id) { + object_store_->Put(data, object_id); +} + void AbstractRayRuntime::Put(std::shared_ptr data, const ObjectID &object_id) { - object_store_->Put(object_id, data); + object_store_->Put(data, object_id); } ObjectID AbstractRayRuntime::Put(std::shared_ptr data) { ObjectID object_id = ObjectID::FromIndex(worker_->GetCurrentTaskID(), worker_->GetNextPutIndex()); - Put(data, object_id); + Put(data, &object_id); return object_id; } diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.h b/cpp/src/ray/runtime/abstract_ray_runtime.h index f0bd0e23e..00535e9fa 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.h +++ b/cpp/src/ray/runtime/abstract_ray_runtime.h @@ -19,6 +19,8 @@ class AbstractRayRuntime : public RayRuntime { public: virtual ~AbstractRayRuntime(){}; + void Put(std::shared_ptr data, ObjectID *object_id); + void Put(std::shared_ptr data, const ObjectID &object_id); ObjectID Put(std::shared_ptr data); @@ -55,6 +57,8 @@ class AbstractRayRuntime : public RayRuntime { private: static AbstractRayRuntime *DoInit(std::shared_ptr config); + static void DoShutdown(std::shared_ptr config); + void Execute(const TaskSpecification &task_spec); friend class Ray; diff --git a/cpp/src/ray/runtime/native_ray_runtime.cc b/cpp/src/ray/runtime/native_ray_runtime.cc index e494e663d..04bb17e0d 100644 --- a/cpp/src/ray/runtime/native_ray_runtime.cc +++ b/cpp/src/ray/runtime/native_ray_runtime.cc @@ -13,8 +13,6 @@ namespace api { NativeRayRuntime::NativeRayRuntime(std::shared_ptr config) { config_ = config; - worker_ = std::unique_ptr( - new WorkerContext(WorkerType::DRIVER, WorkerID::Nil(), JobID::Nil())); object_store_ = std::unique_ptr(new NativeObjectStore(*this)); task_submitter_ = std::unique_ptr(new NativeTaskSubmitter(*this)); task_executor_ = std::unique_ptr(new TaskExecutor(*this)); diff --git a/cpp/src/ray/runtime/object/local_mode_object_store.cc b/cpp/src/ray/runtime/object/local_mode_object_store.cc index 66ca5764e..a13b22811 100644 --- a/cpp/src/ray/runtime/object/local_mode_object_store.cc +++ b/cpp/src/ray/runtime/object/local_mode_object_store.cc @@ -18,8 +18,13 @@ LocalModeObjectStore::LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_t std::unique_ptr<::ray::CoreWorkerMemoryStore>(new ::ray::CoreWorkerMemoryStore()); } -void LocalModeObjectStore::PutRaw(const ObjectID &object_id, - std::shared_ptr data) { +void LocalModeObjectStore::PutRaw(std::shared_ptr data, + ObjectID *object_id) { + PutRaw(data, (const ObjectID)(*object_id)); +} + +void LocalModeObjectStore::PutRaw(std::shared_ptr data, + const ObjectID &object_id) { auto buffer = std::make_shared<::ray::LocalMemoryBuffer>( reinterpret_cast(data->data()), data->size(), true); auto status = memory_store_->Put( diff --git a/cpp/src/ray/runtime/object/local_mode_object_store.h b/cpp/src/ray/runtime/object/local_mode_object_store.h index ee5c6b0db..e5628e67e 100644 --- a/cpp/src/ray/runtime/object/local_mode_object_store.h +++ b/cpp/src/ray/runtime/object/local_mode_object_store.h @@ -17,7 +17,9 @@ class LocalModeObjectStore : public ObjectStore { WaitResult Wait(const std::vector &ids, int num_objects, int timeout_ms); private: - void PutRaw(const ObjectID &object_id, std::shared_ptr data); + void PutRaw(std::shared_ptr data, ObjectID *object_id); + + void PutRaw(std::shared_ptr data, const ObjectID &object_id); std::shared_ptr GetRaw(const ObjectID &object_id, int timeout_ms); diff --git a/cpp/src/ray/runtime/object/native_object_store.cc b/cpp/src/ray/runtime/object/native_object_store.cc index 631c7c139..d11b0487a 100644 --- a/cpp/src/ray/runtime/object/native_object_store.cc +++ b/cpp/src/ray/runtime/object/native_object_store.cc @@ -15,17 +15,60 @@ namespace api { NativeObjectStore::NativeObjectStore(NativeRayRuntime &native_ray_tuntime) : native_ray_tuntime_(native_ray_tuntime) {} -void NativeObjectStore::PutRaw(const ObjectID &object_id, - std::shared_ptr data) {} +void NativeObjectStore::PutRaw(std::shared_ptr data, + ObjectID *object_id) { + auto &core_worker = CoreWorkerProcess::GetCoreWorker(); + auto buffer = std::make_shared<::ray::LocalMemoryBuffer>( + reinterpret_cast(data->data()), data->size(), true); + auto status = core_worker.Put( + ::ray::RayObject(buffer, nullptr, std::vector()), {}, object_id); + if (!status.ok()) { + throw RayException("Put object error"); + } + return; +} + +void NativeObjectStore::PutRaw(std::shared_ptr data, + const ObjectID &object_id) { + auto &core_worker = CoreWorkerProcess::GetCoreWorker(); + auto buffer = std::make_shared<::ray::LocalMemoryBuffer>( + reinterpret_cast(data->data()), data->size(), true); + auto status = core_worker.Put( + ::ray::RayObject(buffer, nullptr, std::vector()), {}, object_id); + if (!status.ok()) { + throw RayException("Put object error"); + } + return; +} std::shared_ptr NativeObjectStore::GetRaw(const ObjectID &object_id, int timeout_ms) { - return nullptr; + std::vector object_ids; + object_ids.push_back(object_id); + auto buffers = GetRaw(object_ids, timeout_ms); + RAY_CHECK(buffers.size() == 1); + return buffers[0]; } std::vector> NativeObjectStore::GetRaw( const std::vector &ids, int timeout_ms) { - return std::vector>(); + auto &core_worker = CoreWorkerProcess::GetCoreWorker(); + std::vector> results; + ::ray::Status status = core_worker.Get(ids, timeout_ms, &results); + if (!status.ok()) { + throw RayException("Get object error: " + status.ToString()); + } + RAY_CHECK(results.size() == ids.size()); + std::vector> result_sbuffers; + result_sbuffers.reserve(results.size()); + for (size_t i = 0; i < results.size(); i++) { + auto data_buffer = results[i]->GetData(); + auto sbuffer = std::make_shared(data_buffer->Size()); + sbuffer->write(reinterpret_cast(data_buffer->Data()), + data_buffer->Size()); + result_sbuffers.push_back(sbuffer); + } + return result_sbuffers; } WaitResult NativeObjectStore::Wait(const std::vector &ids, int num_objects, diff --git a/cpp/src/ray/runtime/object/native_object_store.h b/cpp/src/ray/runtime/object/native_object_store.h index 5e0a5a1f9..0bca77909 100644 --- a/cpp/src/ray/runtime/object/native_object_store.h +++ b/cpp/src/ray/runtime/object/native_object_store.h @@ -17,7 +17,9 @@ class NativeObjectStore : public ObjectStore { WaitResult Wait(const std::vector &ids, int num_objects, int timeout_ms); private: - void PutRaw(const ObjectID &object_id, std::shared_ptr data); + void PutRaw(std::shared_ptr data, ObjectID *object_id); + + void PutRaw(std::shared_ptr data, const ObjectID &object_id); std::shared_ptr GetRaw(const ObjectID &object_id, int timeout_ms); diff --git a/cpp/src/ray/runtime/object/object_store.cc b/cpp/src/ray/runtime/object/object_store.cc index e944d07a1..e60ba1e03 100644 --- a/cpp/src/ray/runtime/object/object_store.cc +++ b/cpp/src/ray/runtime/object/object_store.cc @@ -7,8 +7,12 @@ namespace ray { namespace api { -void ObjectStore::Put(const ObjectID &object_id, std::shared_ptr data) { - PutRaw(object_id, data); +void ObjectStore::Put(std::shared_ptr data, ObjectID *object_id) { + PutRaw(data, object_id); +} + +void ObjectStore::Put(std::shared_ptr data, const ObjectID &object_id) { + PutRaw(data, object_id); } std::shared_ptr ObjectStore::Get(const ObjectID &object_id, diff --git a/cpp/src/ray/runtime/object/object_store.h b/cpp/src/ray/runtime/object/object_store.h index 856ede5d7..8942d590d 100644 --- a/cpp/src/ray/runtime/object/object_store.h +++ b/cpp/src/ray/runtime/object/object_store.h @@ -18,9 +18,15 @@ class ObjectStore { /// Store an object in the object store. /// + /// \param[in] data The serialized object data buffer to store. + /// \param[out] The id which is allocated to the object. + void Put(std::shared_ptr data, ObjectID *object_id); + + /// Store an object in the object store. + /// + /// \param[in] data The serialized object data buffer to store. /// \param[in] object_id The object which should be stored. - /// \param[in] data The Serialized object buffer which should be stored. - void Put(const ObjectID &object_id, std::shared_ptr data); + void Put(std::shared_ptr data, const ObjectID &object_id); /// Get a single object from the object store. /// This method will be blocked until the object are ready or wait for timeout. @@ -52,8 +58,10 @@ class ObjectStore { int timeout_ms) = 0; private: - virtual void PutRaw(const ObjectID &object_id, - std::shared_ptr data) = 0; + virtual void PutRaw(std::shared_ptr data, ObjectID *object_id) = 0; + + virtual void PutRaw(std::shared_ptr data, + const ObjectID &object_id) = 0; virtual std::shared_ptr GetRaw(const ObjectID &object_id, int timeout_ms) = 0; diff --git a/cpp/src/ray/test/cluster/cluster_mode_test.cc b/cpp/src/ray/test/cluster/cluster_mode_test.cc new file mode 100644 index 000000000..98830336a --- /dev/null +++ b/cpp/src/ray/test/cluster/cluster_mode_test.cc @@ -0,0 +1,15 @@ + +#include +#include +#include + +using namespace ray::api; + +TEST(RayClusterModeTest, PutTest) { + ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER; + Ray::Init(); + auto obj1 = Ray::Put(12345); + auto i1 = obj1.Get(); + EXPECT_EQ(12345, *i1); + Ray::Shutdown(); +} \ No newline at end of file diff --git a/cpp/src/ray/util/process_helper.cc b/cpp/src/ray/util/process_helper.cc index da23bd0e4..7fd2ab1e8 100644 --- a/cpp/src/ray/util/process_helper.cc +++ b/cpp/src/ray/util/process_helper.cc @@ -1,9 +1,97 @@ #include "process_helper.h" +#include "hiredis/hiredis.h" +#include "ray/core.h" +#include "ray/util/process.h" +#include "ray/util/util.h" namespace ray { namespace api { -void ProcessHelper::RayStart() { return; } +static std::string GetSessionDir(std::string redis_ip, int port, std::string password) { + redisContext *context = redisConnect(redis_ip.c_str(), port); + RAY_CHECK(context != nullptr && !context->err); + if (!password.empty()) { + auto auth_reply = (redisReply *)redisCommand(context, "AUTH %s", password.c_str()); + RAY_CHECK(auth_reply->type != REDIS_REPLY_ERROR); + freeReplyObject(auth_reply); + } + auto reply = (redisReply *)redisCommand(context, "GET session_dir"); + RAY_CHECK(reply->type != REDIS_REPLY_ERROR); + std::string session_dir(reply->str); + freeReplyObject(reply); + redisFree(context); + return session_dir; +} + +static void StartRayNode(int redis_port, std::string redis_password) { + std::vector cmdargs({"ray", "start", "--head", "--redis-port", + std::to_string(redis_port), "--redis-password", + redis_password}); + RAY_LOG(INFO) << CreateCommandLine(cmdargs); + RAY_CHECK(!Process::Spawn(cmdargs, true).second); + sleep(5); + return; +} + +static void StopRayNode() { + std::vector cmdargs({"ray", "stop"}); + RAY_LOG(INFO) << CreateCommandLine(cmdargs); + RAY_CHECK(!Process::Spawn(cmdargs, true).second); + usleep(1000 * 1000); + return; +} + +void ProcessHelper::RayStart(std::shared_ptr config) { + std::string redis_ip = config->redis_ip; + if (redis_ip.empty()) { + redis_ip = "127.0.0.1"; + StartRayNode(config->redis_port, config->redis_password); + } + + auto session_dir = GetSessionDir(redis_ip, config->redis_port, config->redis_password); + + gcs::GcsClientOptions gcs_options = + gcs::GcsClientOptions(redis_ip, config->redis_port, config->redis_password); + + CoreWorkerOptions options = { + config->worker_type, // worker_type + Language::CPP, // langauge + session_dir + "/sockets/plasma_store", // store_socket + session_dir + "/sockets/raylet", // raylet_socket + JobID::FromInt(1), // job_id + gcs_options, // gcs_options + true, // enable_logging + "", // log_dir + true, // install_failure_signal_handler + "127.0.0.1", // node_ip_address + config->node_manager_port, // node_manager_port + "127.0.0.1", // raylet_ip_address + "cpp_worker", // driver_name + "", // stdout_file + "", // stderr_file + nullptr, // task_execution_callback + nullptr, // check_signals + nullptr, // gc_collect + nullptr, // spill_objects + nullptr, // restore_spilled_objects + nullptr, // get_lang_stack + nullptr, // kill_main + true, // ref_counting_enabled + false, // is_local_mode + 1, // num_workers + nullptr, // terminate_asyncio_thread + "", // serialized_job_config + -1, // metrics_agent_port + }; + CoreWorkerProcess::Initialize(options); +} + +void ProcessHelper::RayStop(std::shared_ptr config) { + CoreWorkerProcess::Shutdown(); + if (config->redis_ip.empty()) { + StopRayNode(); + } +} } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/util/process_helper.h b/cpp/src/ray/util/process_helper.h index fd9be3a5a..3dcc10ebb 100644 --- a/cpp/src/ray/util/process_helper.h +++ b/cpp/src/ray/util/process_helper.h @@ -1,11 +1,26 @@ #pragma once +#include +#include +#include "ray/core.h" namespace ray { namespace api { class ProcessHelper { public: - static void RayStart(); + void RayStart(std::shared_ptr config); + void RayStop(std::shared_ptr config); + + static ProcessHelper &getInstance() { + static ProcessHelper processHelper; + return processHelper; + } + + ProcessHelper(ProcessHelper const &) = delete; + void operator=(ProcessHelper const &) = delete; + + private: + ProcessHelper(){}; }; } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/worker/default_worker.cc b/cpp/src/ray/worker/default_worker.cc new file mode 100644 index 000000000..2b841012e --- /dev/null +++ b/cpp/src/ray/worker/default_worker.cc @@ -0,0 +1,83 @@ +#define BOOST_BIND_NO_PLACEHOLDERS +#include "ray/core_worker/context.h" +#include "ray/core_worker/core_worker.h" + +using namespace std::placeholders; + +namespace ray { + +namespace api { + +class DefaultWorker { + public: + DefaultWorker(const std::string &store_socket, const std::string &raylet_socket, + int node_manager_port, const gcs::GcsClientOptions &gcs_options, + const std::string &session_dir) { + CoreWorkerOptions options = { + WorkerType::WORKER, // worker_type + Language::CPP, // langauge + store_socket, // store_socket + raylet_socket, // raylet_socket + JobID::FromInt(1), // job_id + gcs_options, // gcs_options + true, // enable_logging + session_dir + "/logs", // log_dir + true, // install_failure_signal_handler + "127.0.0.1", // node_ip_address + node_manager_port, // node_manager_port + "127.0.0.1", // raylet_ip_address + "", // driver_name + "", // stdout_file + "", // stderr_file + std::bind(&DefaultWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, + _7), // task_execution_callback + nullptr, // check_signals + nullptr, // gc_collect + nullptr, // spill_objects + nullptr, // restore_spilled_objects + nullptr, // get_lang_stack + nullptr, // kill_main + true, // ref_counting_enabled + false, // is_local_mode + 1, // num_workers + nullptr, // terminate_asyncio_thread + "", // serialized_job_config + -1, // metrics_agent_port + }; + + CoreWorkerProcess::Initialize(options); + } + + void RunTaskExecutionLoop() { CoreWorkerProcess::RunTaskExecutionLoop(); } + + private: + Status ExecuteTask(TaskType task_type, const RayFunction &ray_function, + const std::unordered_map &required_resources, + const std::vector> &args, + const std::vector &arg_reference_ids, + const std::vector &return_ids, + std::vector> *results) { + /// TODO(Guyang Song): Make task execution worked. + return Status::TypeError("Task executor not implemented"); + } +}; +} // namespace api +} // namespace ray + +int main(int argc, char **argv) { + RAY_LOG(INFO) << "CPP default worker started"; + + RAY_CHECK(argc == 6); + auto store_socket = std::string(argv[1]); + auto raylet_socket = std::string(argv[2]); + auto node_manager_port = std::stoi(std::string(argv[3])); + auto redis_password = std::string(std::string(argv[4])); + auto session_dir = std::string(std::string(argv[5])); + + /// TODO(Guyang Song): Delete this hard code and get address from redis. + ray::gcs::GcsClientOptions gcs_options("127.0.0.1", 6379, redis_password); + ray::api::DefaultWorker worker(store_socket, raylet_socket, node_manager_port, + gcs_options, session_dir); + worker.RunTaskExecutionLoop(); + return 0; +} diff --git a/python/ray/services.py b/python/ray/services.py index 7acbbf075..4adbb1f85 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -66,6 +66,11 @@ GCS_SERVER_EXECUTABLE = os.path.join( os.path.abspath(os.path.dirname(__file__)), "core/src/ray/gcs/gcs_server" + EXE_SUFFIX) +# Location of the cpp default worker executables. +DEFAULT_WORKER_EXECUTABLE = os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "core/src/ray/cpp/default_worker" + EXE_SUFFIX) + DEFAULT_JAVA_WORKER_CLASSPATH = [ os.path.join( os.path.abspath(os.path.dirname(__file__)), "../../../build/java/*"), @@ -1374,6 +1379,19 @@ def start_raylet(redis_address, else: java_worker_command = [] + if os.path.exists(DEFAULT_WORKER_EXECUTABLE): + cpp_worker_command = build_cpp_worker_command( + "", + redis_address, + node_manager_port, + plasma_store_name, + raylet_name, + redis_password, + session_dir, + ) + else: + cpp_worker_command = [] + # Create the command that the Raylet will use to start workers. start_worker_command = [ sys.executable, worker_path, f"--node-ip-address={node_ip_address}", @@ -1438,6 +1456,7 @@ def start_raylet(redis_address, f"--config_list={config_str}", f"--python_worker_command={subprocess.list2cmdline(start_worker_command)}", # noqa f"--java_worker_command={subprocess.list2cmdline(java_worker_command)}", # noqa + f"--cpp_worker_command={subprocess.list2cmdline(cpp_worker_command)}", # noqa f"--redis_password={redis_password or ''}", f"--temp_dir={temp_dir}", f"--session_dir={session_dir}", @@ -1561,6 +1580,36 @@ def build_java_worker_command( return command +def build_cpp_worker_command( + cpp_worker_options, + redis_address, + node_manager_port, + plasma_store_name, + raylet_name, + redis_password, + session_dir, +): + """This method assembles the command used to start a CPP worker. + + Args: + cpp_worker_options (list): The command options for CPP worker. + redis_address (str): Redis address of GCS. + plasma_store_name (str): The name of the plasma store socket to connect + to. + raylet_name (str): The name of the raylet socket to create. + redis_password (str): The password of connect to redis. + session_dir (str): The path of this session. + Returns: + The command string for starting CPP worker. + """ + command = [ + DEFAULT_WORKER_EXECUTABLE, plasma_store_name, raylet_name, + str(node_manager_port), redis_password, session_dir + ] + + return command + + def determine_plasma_store_config(object_store_memory, plasma_directory=None, huge_pages=False): diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 9b0abfcc7..e643daef9 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -199,6 +199,9 @@ RAY_CONFIG(int, num_workers_per_process_python, 1) /// Number of workers per Java worker process RAY_CONFIG(int, num_workers_per_process_java, 10) +/// Number of workers per CPP worker process +RAY_CONFIG(int, num_workers_per_process_cpp, 1) + /// Maximum number of ids in one batch to send to GCS to delete keys. RAY_CONFIG(uint32_t, maximum_gcs_deletion_batch_size, 1000) diff --git a/src/ray/core_worker/common.cc b/src/ray/core_worker/common.cc index 7470807d2..5f0951800 100644 --- a/src/ray/core_worker/common.cc +++ b/src/ray/core_worker/common.cc @@ -34,6 +34,8 @@ std::string LanguageString(Language language) { return "python"; } else if (language == Language::JAVA) { return "java"; + } else if (language == Language::CPP) { + return "cpp"; } RAY_CHECK(false); return ""; diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 8e0ba07c3..4d75d8d65 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -45,6 +45,7 @@ DEFINE_string(config_list, "", "The raylet config list of this node."); DEFINE_string(python_worker_command, "", "Python worker command."); DEFINE_string(java_worker_command, "", "Java worker command."); DEFINE_string(agent_command, "", "Dashboard agent command."); +DEFINE_string(cpp_worker_command, "", "CPP worker command."); DEFINE_string(redis_password, "", "The password of redis."); DEFINE_string(temp_dir, "", "Temporary directory."); DEFINE_string(session_dir, "", "The path of this ray session directory."); @@ -84,6 +85,7 @@ int main(int argc, char *argv[]) { const std::string python_worker_command = FLAGS_python_worker_command; const std::string java_worker_command = FLAGS_java_worker_command; const std::string agent_command = FLAGS_agent_command; + const std::string cpp_worker_command = FLAGS_cpp_worker_command; const std::string redis_password = FLAGS_redis_password; const std::string temp_dir = FLAGS_temp_dir; const std::string session_dir = FLAGS_session_dir; @@ -182,9 +184,14 @@ int main(int argc, char *argv[]) { node_manager_config.worker_commands.emplace( make_pair(ray::Language::JAVA, ParseCommandLine(java_worker_command))); } - if (python_worker_command.empty() && java_worker_command.empty()) { - RAY_CHECK(0) << "Either Python worker command or Java worker command should be " - "provided."; + if (!cpp_worker_command.empty()) { + node_manager_config.worker_commands.emplace( + make_pair(ray::Language::CPP, ParseCommandLine(cpp_worker_command))); + } + if (python_worker_command.empty() && java_worker_command.empty() && + cpp_worker_command.empty()) { + RAY_LOG(FATAL) << "At least one of Python/Java/CPP worker command " + << "should be provided"; } if (!agent_command.empty()) { node_manager_config.agent_command = agent_command; diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index ef949ecbb..8b759ec91 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -90,6 +90,10 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, state.num_workers_per_process = RayConfig::instance().num_workers_per_process_java(); break; + case Language::CPP: + state.num_workers_per_process = + RayConfig::instance().num_workers_per_process_cpp(); + break; default: RAY_LOG(FATAL) << "The number of workers per process for " << Language_Name(entry.first) << " worker is not set.";