mirror of
https://github.com/wassname/ray.git
synced 2026-07-04 07:35:11 +08:00
[cpp worker] support cluster mode and object Put/Get works (#9682)
This commit is contained in:
@@ -33,7 +33,7 @@ int main() {
|
||||
|
||||
/// put and get object
|
||||
auto obj = Ray::Put(123);
|
||||
auto getRsult = obj.Get();
|
||||
auto get_result = obj.Get();
|
||||
|
||||
/// general function remote call(args passed by value)
|
||||
auto r0 = Ray::Task(Return1).Remote();
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
|
||||
/// This is a complete example of writing a distributed program using the C ++ worker API.
|
||||
|
||||
/// including the header
|
||||
#include <ray/api.h>
|
||||
#include <ray/api/ray_config.h>
|
||||
#include <ray/util/logging.h>
|
||||
|
||||
/// using namespace
|
||||
using namespace ray::api;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
RAY_LOG(INFO) << "Start cpp worker example";
|
||||
|
||||
/// initialization to cluster mode
|
||||
ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER;
|
||||
/// Set redis ip to connect an existing ray cluster.
|
||||
/// ray::api::RayConfig::GetInstance()->redis_ip = "127.0.0.1";
|
||||
Ray::Init();
|
||||
|
||||
/// put and get object
|
||||
auto obj = Ray::Put(123);
|
||||
auto get_result = *(obj.Get());
|
||||
|
||||
RAY_LOG(INFO) << "Get result: " << get_result;
|
||||
Ray::Shutdown();
|
||||
}
|
||||
@@ -15,5 +15,7 @@ void Ray::Init() {
|
||||
[] { runtime_ = AbstractRayRuntime::DoInit(RayConfig::GetInstance()); });
|
||||
}
|
||||
|
||||
void Ray::Shutdown() { AbstractRayRuntime::DoShutdown(RayConfig::GetInstance()); }
|
||||
|
||||
} // namespace api
|
||||
} // namespace ray
|
||||
@@ -20,22 +20,33 @@ AbstractRayRuntime *AbstractRayRuntime::DoInit(std::shared_ptr<RayConfig> config
|
||||
GenerateBaseAddressOfCurrentLibrary();
|
||||
runtime = new LocalModeRayRuntime(config);
|
||||
} else {
|
||||
ProcessHelper::RayStart();
|
||||
ProcessHelper::getInstance().RayStart(config);
|
||||
runtime = new NativeRayRuntime(config);
|
||||
}
|
||||
RAY_CHECK(runtime);
|
||||
return runtime;
|
||||
}
|
||||
|
||||
void AbstractRayRuntime::DoShutdown(std::shared_ptr<RayConfig> config) {
|
||||
if (config->run_mode == RunMode::CLUSTER) {
|
||||
ProcessHelper::getInstance().RayStop(config);
|
||||
}
|
||||
}
|
||||
|
||||
void AbstractRayRuntime::Put(std::shared_ptr<msgpack::sbuffer> data,
|
||||
ObjectID *object_id) {
|
||||
object_store_->Put(data, object_id);
|
||||
}
|
||||
|
||||
void AbstractRayRuntime::Put(std::shared_ptr<msgpack::sbuffer> data,
|
||||
const ObjectID &object_id) {
|
||||
object_store_->Put(object_id, data);
|
||||
object_store_->Put(data, object_id);
|
||||
}
|
||||
|
||||
ObjectID AbstractRayRuntime::Put(std::shared_ptr<msgpack::sbuffer> data) {
|
||||
ObjectID object_id =
|
||||
ObjectID::FromIndex(worker_->GetCurrentTaskID(), worker_->GetNextPutIndex());
|
||||
Put(data, object_id);
|
||||
Put(data, &object_id);
|
||||
return object_id;
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,8 @@ class AbstractRayRuntime : public RayRuntime {
|
||||
public:
|
||||
virtual ~AbstractRayRuntime(){};
|
||||
|
||||
void Put(std::shared_ptr<msgpack::sbuffer> data, ObjectID *object_id);
|
||||
|
||||
void Put(std::shared_ptr<msgpack::sbuffer> data, const ObjectID &object_id);
|
||||
|
||||
ObjectID Put(std::shared_ptr<msgpack::sbuffer> data);
|
||||
@@ -55,6 +57,8 @@ class AbstractRayRuntime : public RayRuntime {
|
||||
private:
|
||||
static AbstractRayRuntime *DoInit(std::shared_ptr<RayConfig> config);
|
||||
|
||||
static void DoShutdown(std::shared_ptr<RayConfig> config);
|
||||
|
||||
void Execute(const TaskSpecification &task_spec);
|
||||
|
||||
friend class Ray;
|
||||
|
||||
@@ -13,8 +13,6 @@ namespace api {
|
||||
|
||||
NativeRayRuntime::NativeRayRuntime(std::shared_ptr<RayConfig> config) {
|
||||
config_ = config;
|
||||
worker_ = std::unique_ptr<WorkerContext>(
|
||||
new WorkerContext(WorkerType::DRIVER, WorkerID::Nil(), JobID::Nil()));
|
||||
object_store_ = std::unique_ptr<ObjectStore>(new NativeObjectStore(*this));
|
||||
task_submitter_ = std::unique_ptr<TaskSubmitter>(new NativeTaskSubmitter(*this));
|
||||
task_executor_ = std::unique_ptr<TaskExecutor>(new TaskExecutor(*this));
|
||||
|
||||
@@ -18,8 +18,13 @@ LocalModeObjectStore::LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_t
|
||||
std::unique_ptr<::ray::CoreWorkerMemoryStore>(new ::ray::CoreWorkerMemoryStore());
|
||||
}
|
||||
|
||||
void LocalModeObjectStore::PutRaw(const ObjectID &object_id,
|
||||
std::shared_ptr<msgpack::sbuffer> data) {
|
||||
void LocalModeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
|
||||
ObjectID *object_id) {
|
||||
PutRaw(data, (const ObjectID)(*object_id));
|
||||
}
|
||||
|
||||
void LocalModeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
|
||||
const ObjectID &object_id) {
|
||||
auto buffer = std::make_shared<::ray::LocalMemoryBuffer>(
|
||||
reinterpret_cast<uint8_t *>(data->data()), data->size(), true);
|
||||
auto status = memory_store_->Put(
|
||||
|
||||
@@ -17,7 +17,9 @@ class LocalModeObjectStore : public ObjectStore {
|
||||
WaitResult Wait(const std::vector<ObjectID> &ids, int num_objects, int timeout_ms);
|
||||
|
||||
private:
|
||||
void PutRaw(const ObjectID &object_id, std::shared_ptr<msgpack::sbuffer> data);
|
||||
void PutRaw(std::shared_ptr<msgpack::sbuffer> data, ObjectID *object_id);
|
||||
|
||||
void PutRaw(std::shared_ptr<msgpack::sbuffer> data, const ObjectID &object_id);
|
||||
|
||||
std::shared_ptr<msgpack::sbuffer> GetRaw(const ObjectID &object_id, int timeout_ms);
|
||||
|
||||
|
||||
@@ -15,17 +15,60 @@ namespace api {
|
||||
NativeObjectStore::NativeObjectStore(NativeRayRuntime &native_ray_tuntime)
|
||||
: native_ray_tuntime_(native_ray_tuntime) {}
|
||||
|
||||
void NativeObjectStore::PutRaw(const ObjectID &object_id,
|
||||
std::shared_ptr<msgpack::sbuffer> data) {}
|
||||
void NativeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
|
||||
ObjectID *object_id) {
|
||||
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
|
||||
auto buffer = std::make_shared<::ray::LocalMemoryBuffer>(
|
||||
reinterpret_cast<uint8_t *>(data->data()), data->size(), true);
|
||||
auto status = core_worker.Put(
|
||||
::ray::RayObject(buffer, nullptr, std::vector<ObjectID>()), {}, object_id);
|
||||
if (!status.ok()) {
|
||||
throw RayException("Put object error");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
void NativeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
|
||||
const ObjectID &object_id) {
|
||||
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
|
||||
auto buffer = std::make_shared<::ray::LocalMemoryBuffer>(
|
||||
reinterpret_cast<uint8_t *>(data->data()), data->size(), true);
|
||||
auto status = core_worker.Put(
|
||||
::ray::RayObject(buffer, nullptr, std::vector<ObjectID>()), {}, object_id);
|
||||
if (!status.ok()) {
|
||||
throw RayException("Put object error");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
std::shared_ptr<msgpack::sbuffer> NativeObjectStore::GetRaw(const ObjectID &object_id,
|
||||
int timeout_ms) {
|
||||
return nullptr;
|
||||
std::vector<ObjectID> object_ids;
|
||||
object_ids.push_back(object_id);
|
||||
auto buffers = GetRaw(object_ids, timeout_ms);
|
||||
RAY_CHECK(buffers.size() == 1);
|
||||
return buffers[0];
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<msgpack::sbuffer>> NativeObjectStore::GetRaw(
|
||||
const std::vector<ObjectID> &ids, int timeout_ms) {
|
||||
return std::vector<std::shared_ptr<msgpack::sbuffer>>();
|
||||
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
|
||||
std::vector<std::shared_ptr<::ray::RayObject>> results;
|
||||
::ray::Status status = core_worker.Get(ids, timeout_ms, &results);
|
||||
if (!status.ok()) {
|
||||
throw RayException("Get object error: " + status.ToString());
|
||||
}
|
||||
RAY_CHECK(results.size() == ids.size());
|
||||
std::vector<std::shared_ptr<msgpack::sbuffer>> result_sbuffers;
|
||||
result_sbuffers.reserve(results.size());
|
||||
for (size_t i = 0; i < results.size(); i++) {
|
||||
auto data_buffer = results[i]->GetData();
|
||||
auto sbuffer = std::make_shared<msgpack::sbuffer>(data_buffer->Size());
|
||||
sbuffer->write(reinterpret_cast<const char *>(data_buffer->Data()),
|
||||
data_buffer->Size());
|
||||
result_sbuffers.push_back(sbuffer);
|
||||
}
|
||||
return result_sbuffers;
|
||||
}
|
||||
|
||||
WaitResult NativeObjectStore::Wait(const std::vector<ObjectID> &ids, int num_objects,
|
||||
|
||||
@@ -17,7 +17,9 @@ class NativeObjectStore : public ObjectStore {
|
||||
WaitResult Wait(const std::vector<ObjectID> &ids, int num_objects, int timeout_ms);
|
||||
|
||||
private:
|
||||
void PutRaw(const ObjectID &object_id, std::shared_ptr<msgpack::sbuffer> data);
|
||||
void PutRaw(std::shared_ptr<msgpack::sbuffer> data, ObjectID *object_id);
|
||||
|
||||
void PutRaw(std::shared_ptr<msgpack::sbuffer> data, const ObjectID &object_id);
|
||||
|
||||
std::shared_ptr<msgpack::sbuffer> GetRaw(const ObjectID &object_id, int timeout_ms);
|
||||
|
||||
|
||||
@@ -7,8 +7,12 @@
|
||||
namespace ray {
|
||||
namespace api {
|
||||
|
||||
void ObjectStore::Put(const ObjectID &object_id, std::shared_ptr<msgpack::sbuffer> data) {
|
||||
PutRaw(object_id, data);
|
||||
void ObjectStore::Put(std::shared_ptr<msgpack::sbuffer> data, ObjectID *object_id) {
|
||||
PutRaw(data, object_id);
|
||||
}
|
||||
|
||||
void ObjectStore::Put(std::shared_ptr<msgpack::sbuffer> data, const ObjectID &object_id) {
|
||||
PutRaw(data, object_id);
|
||||
}
|
||||
|
||||
std::shared_ptr<msgpack::sbuffer> ObjectStore::Get(const ObjectID &object_id,
|
||||
|
||||
@@ -18,9 +18,15 @@ class ObjectStore {
|
||||
|
||||
/// Store an object in the object store.
|
||||
///
|
||||
/// \param[in] data The serialized object data buffer to store.
|
||||
/// \param[out] The id which is allocated to the object.
|
||||
void Put(std::shared_ptr<msgpack::sbuffer> data, ObjectID *object_id);
|
||||
|
||||
/// Store an object in the object store.
|
||||
///
|
||||
/// \param[in] data The serialized object data buffer to store.
|
||||
/// \param[in] object_id The object which should be stored.
|
||||
/// \param[in] data The Serialized object buffer which should be stored.
|
||||
void Put(const ObjectID &object_id, std::shared_ptr<msgpack::sbuffer> data);
|
||||
void Put(std::shared_ptr<msgpack::sbuffer> data, const ObjectID &object_id);
|
||||
|
||||
/// Get a single object from the object store.
|
||||
/// This method will be blocked until the object are ready or wait for timeout.
|
||||
@@ -52,8 +58,10 @@ class ObjectStore {
|
||||
int timeout_ms) = 0;
|
||||
|
||||
private:
|
||||
virtual void PutRaw(const ObjectID &object_id,
|
||||
std::shared_ptr<msgpack::sbuffer> data) = 0;
|
||||
virtual void PutRaw(std::shared_ptr<msgpack::sbuffer> data, ObjectID *object_id) = 0;
|
||||
|
||||
virtual void PutRaw(std::shared_ptr<msgpack::sbuffer> data,
|
||||
const ObjectID &object_id) = 0;
|
||||
|
||||
virtual std::shared_ptr<msgpack::sbuffer> GetRaw(const ObjectID &object_id,
|
||||
int timeout_ms) = 0;
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <ray/api.h>
|
||||
#include <ray/api/ray_config.h>
|
||||
|
||||
using namespace ray::api;
|
||||
|
||||
TEST(RayClusterModeTest, PutTest) {
|
||||
ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER;
|
||||
Ray::Init();
|
||||
auto obj1 = Ray::Put(12345);
|
||||
auto i1 = obj1.Get();
|
||||
EXPECT_EQ(12345, *i1);
|
||||
Ray::Shutdown();
|
||||
}
|
||||
@@ -1,9 +1,97 @@
|
||||
#include "process_helper.h"
|
||||
#include "hiredis/hiredis.h"
|
||||
#include "ray/core.h"
|
||||
#include "ray/util/process.h"
|
||||
#include "ray/util/util.h"
|
||||
|
||||
namespace ray {
|
||||
namespace api {
|
||||
|
||||
void ProcessHelper::RayStart() { return; }
|
||||
static std::string GetSessionDir(std::string redis_ip, int port, std::string password) {
|
||||
redisContext *context = redisConnect(redis_ip.c_str(), port);
|
||||
RAY_CHECK(context != nullptr && !context->err);
|
||||
if (!password.empty()) {
|
||||
auto auth_reply = (redisReply *)redisCommand(context, "AUTH %s", password.c_str());
|
||||
RAY_CHECK(auth_reply->type != REDIS_REPLY_ERROR);
|
||||
freeReplyObject(auth_reply);
|
||||
}
|
||||
auto reply = (redisReply *)redisCommand(context, "GET session_dir");
|
||||
RAY_CHECK(reply->type != REDIS_REPLY_ERROR);
|
||||
std::string session_dir(reply->str);
|
||||
freeReplyObject(reply);
|
||||
redisFree(context);
|
||||
return session_dir;
|
||||
}
|
||||
|
||||
static void StartRayNode(int redis_port, std::string redis_password) {
|
||||
std::vector<std::string> cmdargs({"ray", "start", "--head", "--redis-port",
|
||||
std::to_string(redis_port), "--redis-password",
|
||||
redis_password});
|
||||
RAY_LOG(INFO) << CreateCommandLine(cmdargs);
|
||||
RAY_CHECK(!Process::Spawn(cmdargs, true).second);
|
||||
sleep(5);
|
||||
return;
|
||||
}
|
||||
|
||||
static void StopRayNode() {
|
||||
std::vector<std::string> cmdargs({"ray", "stop"});
|
||||
RAY_LOG(INFO) << CreateCommandLine(cmdargs);
|
||||
RAY_CHECK(!Process::Spawn(cmdargs, true).second);
|
||||
usleep(1000 * 1000);
|
||||
return;
|
||||
}
|
||||
|
||||
void ProcessHelper::RayStart(std::shared_ptr<RayConfig> config) {
|
||||
std::string redis_ip = config->redis_ip;
|
||||
if (redis_ip.empty()) {
|
||||
redis_ip = "127.0.0.1";
|
||||
StartRayNode(config->redis_port, config->redis_password);
|
||||
}
|
||||
|
||||
auto session_dir = GetSessionDir(redis_ip, config->redis_port, config->redis_password);
|
||||
|
||||
gcs::GcsClientOptions gcs_options =
|
||||
gcs::GcsClientOptions(redis_ip, config->redis_port, config->redis_password);
|
||||
|
||||
CoreWorkerOptions options = {
|
||||
config->worker_type, // worker_type
|
||||
Language::CPP, // langauge
|
||||
session_dir + "/sockets/plasma_store", // store_socket
|
||||
session_dir + "/sockets/raylet", // raylet_socket
|
||||
JobID::FromInt(1), // job_id
|
||||
gcs_options, // gcs_options
|
||||
true, // enable_logging
|
||||
"", // log_dir
|
||||
true, // install_failure_signal_handler
|
||||
"127.0.0.1", // node_ip_address
|
||||
config->node_manager_port, // node_manager_port
|
||||
"127.0.0.1", // raylet_ip_address
|
||||
"cpp_worker", // driver_name
|
||||
"", // stdout_file
|
||||
"", // stderr_file
|
||||
nullptr, // task_execution_callback
|
||||
nullptr, // check_signals
|
||||
nullptr, // gc_collect
|
||||
nullptr, // spill_objects
|
||||
nullptr, // restore_spilled_objects
|
||||
nullptr, // get_lang_stack
|
||||
nullptr, // kill_main
|
||||
true, // ref_counting_enabled
|
||||
false, // is_local_mode
|
||||
1, // num_workers
|
||||
nullptr, // terminate_asyncio_thread
|
||||
"", // serialized_job_config
|
||||
-1, // metrics_agent_port
|
||||
};
|
||||
CoreWorkerProcess::Initialize(options);
|
||||
}
|
||||
|
||||
void ProcessHelper::RayStop(std::shared_ptr<RayConfig> config) {
|
||||
CoreWorkerProcess::Shutdown();
|
||||
if (config->redis_ip.empty()) {
|
||||
StopRayNode();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace api
|
||||
} // namespace ray
|
||||
@@ -1,11 +1,26 @@
|
||||
#pragma once
|
||||
#include <ray/api/ray_config.h>
|
||||
#include <string>
|
||||
#include "ray/core.h"
|
||||
|
||||
namespace ray {
|
||||
namespace api {
|
||||
|
||||
class ProcessHelper {
|
||||
public:
|
||||
static void RayStart();
|
||||
void RayStart(std::shared_ptr<RayConfig> config);
|
||||
void RayStop(std::shared_ptr<RayConfig> config);
|
||||
|
||||
static ProcessHelper &getInstance() {
|
||||
static ProcessHelper processHelper;
|
||||
return processHelper;
|
||||
}
|
||||
|
||||
ProcessHelper(ProcessHelper const &) = delete;
|
||||
void operator=(ProcessHelper const &) = delete;
|
||||
|
||||
private:
|
||||
ProcessHelper(){};
|
||||
};
|
||||
} // namespace api
|
||||
} // namespace ray
|
||||
@@ -0,0 +1,83 @@
|
||||
#define BOOST_BIND_NO_PLACEHOLDERS
|
||||
#include "ray/core_worker/context.h"
|
||||
#include "ray/core_worker/core_worker.h"
|
||||
|
||||
using namespace std::placeholders;
|
||||
|
||||
namespace ray {
|
||||
|
||||
namespace api {
|
||||
|
||||
class DefaultWorker {
|
||||
public:
|
||||
DefaultWorker(const std::string &store_socket, const std::string &raylet_socket,
|
||||
int node_manager_port, const gcs::GcsClientOptions &gcs_options,
|
||||
const std::string &session_dir) {
|
||||
CoreWorkerOptions options = {
|
||||
WorkerType::WORKER, // worker_type
|
||||
Language::CPP, // langauge
|
||||
store_socket, // store_socket
|
||||
raylet_socket, // raylet_socket
|
||||
JobID::FromInt(1), // job_id
|
||||
gcs_options, // gcs_options
|
||||
true, // enable_logging
|
||||
session_dir + "/logs", // log_dir
|
||||
true, // install_failure_signal_handler
|
||||
"127.0.0.1", // node_ip_address
|
||||
node_manager_port, // node_manager_port
|
||||
"127.0.0.1", // raylet_ip_address
|
||||
"", // driver_name
|
||||
"", // stdout_file
|
||||
"", // stderr_file
|
||||
std::bind(&DefaultWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6,
|
||||
_7), // task_execution_callback
|
||||
nullptr, // check_signals
|
||||
nullptr, // gc_collect
|
||||
nullptr, // spill_objects
|
||||
nullptr, // restore_spilled_objects
|
||||
nullptr, // get_lang_stack
|
||||
nullptr, // kill_main
|
||||
true, // ref_counting_enabled
|
||||
false, // is_local_mode
|
||||
1, // num_workers
|
||||
nullptr, // terminate_asyncio_thread
|
||||
"", // serialized_job_config
|
||||
-1, // metrics_agent_port
|
||||
};
|
||||
|
||||
CoreWorkerProcess::Initialize(options);
|
||||
}
|
||||
|
||||
void RunTaskExecutionLoop() { CoreWorkerProcess::RunTaskExecutionLoop(); }
|
||||
|
||||
private:
|
||||
Status ExecuteTask(TaskType task_type, const RayFunction &ray_function,
|
||||
const std::unordered_map<std::string, double> &required_resources,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args,
|
||||
const std::vector<ObjectID> &arg_reference_ids,
|
||||
const std::vector<ObjectID> &return_ids,
|
||||
std::vector<std::shared_ptr<RayObject>> *results) {
|
||||
/// TODO(Guyang Song): Make task execution worked.
|
||||
return Status::TypeError("Task executor not implemented");
|
||||
}
|
||||
};
|
||||
} // namespace api
|
||||
} // namespace ray
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
RAY_LOG(INFO) << "CPP default worker started";
|
||||
|
||||
RAY_CHECK(argc == 6);
|
||||
auto store_socket = std::string(argv[1]);
|
||||
auto raylet_socket = std::string(argv[2]);
|
||||
auto node_manager_port = std::stoi(std::string(argv[3]));
|
||||
auto redis_password = std::string(std::string(argv[4]));
|
||||
auto session_dir = std::string(std::string(argv[5]));
|
||||
|
||||
/// TODO(Guyang Song): Delete this hard code and get address from redis.
|
||||
ray::gcs::GcsClientOptions gcs_options("127.0.0.1", 6379, redis_password);
|
||||
ray::api::DefaultWorker worker(store_socket, raylet_socket, node_manager_port,
|
||||
gcs_options, session_dir);
|
||||
worker.RunTaskExecutionLoop();
|
||||
return 0;
|
||||
}
|
||||
Reference in New Issue
Block a user