[Core worker] add store & task provider (#4966)

This commit is contained in:
Zhijun Fu
2019-06-14 18:35:32 +08:00
committed by Hao Chen
parent 3c92b2ee4d
commit 37abdb283f
19 changed files with 565 additions and 218 deletions
+4
View File
@@ -111,6 +111,8 @@ cc_library(
srcs = glob(
[
"src/ray/core_worker/*.cc",
"src/ray/core_worker/store_provider/*.cc",
"src/ray/core_worker/transport/*.cc",
],
exclude = [
"src/ray/core_worker/*_test.cc",
@@ -119,6 +121,8 @@ cc_library(
),
hdrs = glob([
"src/ray/core_worker/*.h",
"src/ray/core_worker/store_provider/*.h",
"src/ray/core_worker/transport/*.h",
]),
copts = COPTS,
deps = [
+31
View File
@@ -5,6 +5,8 @@
#include "ray/common/buffer.h"
#include "ray/common/id.h"
#include "ray/raylet/raylet_client.h"
#include "ray/raylet/task_spec.h"
namespace ray {
@@ -66,6 +68,35 @@ class TaskArg {
const std::shared_ptr<Buffer> data_;
};
/// Task specification, which includes the immutable information about the task
/// which are determined at the submission time.
/// TODO(zhijunfu): this can be removed after everything is moved to protobuf.
class TaskSpec {
public:
TaskSpec(const raylet::TaskSpecification &task_spec,
const std::vector<ObjectID> &dependencies)
: task_spec_(task_spec), dependencies_(dependencies) {}
TaskSpec(const raylet::TaskSpecification &&task_spec,
const std::vector<ObjectID> &&dependencies)
: task_spec_(task_spec), dependencies_(dependencies) {}
const raylet::TaskSpecification &GetTaskSpecification() const { return task_spec_; }
const std::vector<ObjectID> &GetDependencies() const { return dependencies_; }
private:
/// Raylet task specification.
raylet::TaskSpecification task_spec_;
/// Dependencies.
std::vector<ObjectID> dependencies_;
};
enum class StoreProviderType { PLASMA };
enum class TaskTransportType { RAYLET };
} // namespace ray
#endif // RAY_CORE_WORKER_COMMON_H
+21 -23
View File
@@ -9,41 +9,39 @@ CoreWorker::CoreWorker(const enum WorkerType worker_type,
DriverID driver_id)
: worker_type_(worker_type),
language_(language),
worker_context_(worker_type, driver_id),
store_socket_(store_socket),
raylet_socket_(raylet_socket),
is_initialized_(false),
worker_context_(worker_type, driver_id),
raylet_client_(raylet_socket_, worker_context_.GetWorkerID(),
(worker_type_ == ray::WorkerType::WORKER),
worker_context_.GetCurrentDriverID(), ToTaskLanguage(language_)),
task_interface_(*this),
object_interface_(*this),
task_execution_interface_(*this) {
switch (language_) {
// TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot
// connect to Raylet after a number of retries, this needs to be changed
// so that the worker (java/python .etc) can retrieve and handle the error
// instead of crashing.
auto status = store_client_.Connect(store_socket_);
if (!status.ok()) {
RAY_LOG(ERROR) << "Connecting plasma store failed when trying to construct"
<< " core worker: " << status.message();
throw std::runtime_error(status.message());
}
}
::Language CoreWorker::ToTaskLanguage(WorkerLanguage language) {
switch (language) {
case ray::WorkerLanguage::JAVA:
task_language_ = ::Language::JAVA;
return ::Language::JAVA;
break;
case ray::WorkerLanguage::PYTHON:
task_language_ = ::Language::PYTHON;
return ::Language::PYTHON;
break;
default:
RAY_LOG(FATAL) << "Unsupported worker language: " << static_cast<int>(language_);
RAY_LOG(FATAL) << "invalid language specified: " << static_cast<int>(language);
break;
}
}
Status CoreWorker::Connect() {
// connect to plasma.
RAY_ARROW_RETURN_NOT_OK(store_client_.Connect(store_socket_));
// connect to raylet.
// TODO: currently RayletClient would crash in its constructor if it cannot
// connect to Raylet after a number of retries, this needs to be changed
// so that the worker (java/python .etc) can retrieve and handle the error
// instead of crashing.
raylet_client_ = std::unique_ptr<RayletClient>(
new RayletClient(raylet_socket_, worker_context_.GetWorkerID(),
(worker_type_ == ray::WorkerType::WORKER),
worker_context_.GetCurrentDriverID(), task_language_));
is_initialized_ = true;
return Status::OK();
}
} // namespace ray
+14 -15
View File
@@ -20,13 +20,12 @@ class CoreWorker {
///
/// \param[in] worker_type Type of this worker.
/// \param[in] langauge Language of this worker.
///
/// NOTE(zhijunfu): the constructor would throw if a failure happens.
CoreWorker(const WorkerType worker_type, const WorkerLanguage language,
const std::string &store_socket, const std::string &raylet_socket,
DriverID driver_id = DriverID::Nil());
/// Connect to raylet.
Status Connect();
/// Type of this worker.
enum WorkerType WorkerType() const { return worker_type_; }
@@ -46,23 +45,26 @@ class CoreWorker {
CoreWorkerTaskExecutionInterface &Execution() { return task_execution_interface_; }
private:
/// Translate from WorkLanguage to Language type (required by raylet client).
///
/// \param[in] language Language for a task.
/// \return Translated task language.
::Language ToTaskLanguage(WorkerLanguage language);
/// Type of this worker.
const enum WorkerType worker_type_;
/// Language of this worker.
const enum WorkerLanguage language_;
/// Language of this worker as specified in flatbuf (used by task spec).
::Language task_language_;
/// Worker context per thread.
WorkerContext worker_context_;
/// Plasma store socket name.
std::string store_socket_;
const std::string store_socket_;
/// raylet socket name.
std::string raylet_socket_;
const std::string raylet_socket_;
/// Worker context.
WorkerContext worker_context_;
/// Plasma store client.
plasma::PlasmaClient store_client_;
@@ -71,10 +73,7 @@ class CoreWorker {
std::mutex store_client_mutex_;
/// Raylet client.
std::unique_ptr<RayletClient> raylet_client_;
/// Whether this worker has been initialized.
bool is_initialized_;
RayletClient raylet_client_;
/// The `CoreWorkerTaskInterface` instance.
CoreWorkerTaskInterface task_interface_;
+9 -13
View File
@@ -128,8 +128,6 @@ class CoreWorkerTest : public ::testing::Test {
raylet_store_socket_names_[0], raylet_socket_names_[0],
DriverID::FromRandom());
RAY_CHECK_OK(driver.Connect());
// Test pass by value.
{
uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8};
@@ -187,7 +185,6 @@ class CoreWorkerTest : public ::testing::Test {
CoreWorker driver(WorkerType::DRIVER, WorkerLanguage::PYTHON,
raylet_store_socket_names_[0], raylet_socket_names_[0],
DriverID::FromRandom());
RAY_CHECK_OK(driver.Connect());
std::unique_ptr<ActorHandle> actor_handle;
@@ -277,13 +274,6 @@ TEST_F(ZeroNodeTest, TestTaskArg) {
ASSERT_EQ(*data, *buffer);
}
TEST_F(ZeroNodeTest, TestAttributeGetters) {
CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON, "", "",
DriverID::FromRandom());
ASSERT_EQ(core_worker.WorkerType(), WorkerType::DRIVER);
ASSERT_EQ(core_worker.Language(), WorkerLanguage::PYTHON);
}
TEST_F(ZeroNodeTest, TestWorkerContext) {
auto driver_id = DriverID::FromRandom();
@@ -313,7 +303,6 @@ TEST_F(SingleNodeTest, TestObjectInterface) {
CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON,
raylet_store_socket_names_[0], raylet_socket_names_[0],
DriverID::FromRandom());
RAY_CHECK_OK(core_worker.Connect());
uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8};
uint8_t array2[] = {10, 11, 12, 13, 14, 15};
@@ -370,12 +359,10 @@ TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) {
CoreWorker worker1(WorkerType::DRIVER, WorkerLanguage::PYTHON,
raylet_store_socket_names_[0], raylet_socket_names_[0],
DriverID::FromRandom());
RAY_CHECK_OK(worker1.Connect());
CoreWorker worker2(WorkerType::DRIVER, WorkerLanguage::PYTHON,
raylet_store_socket_names_[1], raylet_socket_names_[1],
DriverID::FromRandom());
RAY_CHECK_OK(worker2.Connect());
uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8};
uint8_t array2[] = {10, 11, 12, 13, 14, 15};
@@ -456,6 +443,15 @@ TEST_F(TwoNodeTest, TestActorTaskCrossNodes) {
TestActorTask(resources);
}
TEST_F(SingleNodeTest, TestCoreWorkerConstructorFailure) {
try {
CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON, "",
raylet_socket_names_[0], DriverID::FromRandom());
} catch (const std::exception &e) {
std::cout << "Caught exception when constructing core worker: " << e.what();
}
}
} // namespace ray
int main(int argc, char **argv) {
+1 -3
View File
@@ -17,9 +17,7 @@ class MockWorker {
public:
MockWorker(const std::string &store_socket, const std::string &raylet_socket)
: worker_(WorkerType::WORKER, WorkerLanguage::PYTHON, store_socket, raylet_socket,
DriverID::FromRandom()) {
RAY_CHECK_OK(worker_.Connect());
}
DriverID::FromRandom()) {}
void Run() {
auto executor_func = [this](const RayFunction &ray_function,
+19 -108
View File
@@ -2,11 +2,18 @@
#include "ray/common/ray_config.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/core_worker.h"
#include "ray/core_worker/store_provider/plasma_store_provider.h"
namespace ray {
CoreWorkerObjectInterface::CoreWorkerObjectInterface(CoreWorker &core_worker)
: core_worker_(core_worker) {}
: core_worker_(core_worker) {
store_providers_.emplace(
static_cast<int>(StoreProviderType::PLASMA),
std::unique_ptr<CoreWorkerStoreProvider>(new CoreWorkerPlasmaStoreProvider(
core_worker_.store_client_, core_worker_.store_client_mutex_,
core_worker_.raylet_client_)));
}
Status CoreWorkerObjectInterface::Put(const Buffer &buffer, ObjectID *object_id) {
ObjectID put_id = ObjectID::ForPut(core_worker_.worker_context_.GetCurrentTaskID(),
@@ -16,127 +23,31 @@ Status CoreWorkerObjectInterface::Put(const Buffer &buffer, ObjectID *object_id)
}
Status CoreWorkerObjectInterface::Put(const Buffer &buffer, const ObjectID &object_id) {
auto plasma_id = object_id.ToPlasmaId();
std::shared_ptr<arrow::Buffer> data;
{
std::unique_lock<std::mutex> guard(core_worker_.store_client_mutex_);
RAY_ARROW_RETURN_NOT_OK(
core_worker_.store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data));
}
memcpy(data->mutable_data(), buffer.Data(), buffer.Size());
{
std::unique_lock<std::mutex> guard(core_worker_.store_client_mutex_);
RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Seal(plasma_id));
RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Release(plasma_id));
}
return Status::OK();
auto type = static_cast<int>(StoreProviderType::PLASMA);
return store_providers_[type]->Put(buffer, object_id);
}
Status CoreWorkerObjectInterface::Get(const std::vector<ObjectID> &ids,
int64_t timeout_ms,
std::vector<std::shared_ptr<Buffer>> *results) {
(*results).resize(ids.size(), nullptr);
bool was_blocked = false;
std::unordered_map<ObjectID, int> unready;
for (size_t i = 0; i < ids.size(); i++) {
unready.insert({ids[i], i});
}
int num_attempts = 0;
bool should_break = false;
int64_t remaining_timeout = timeout_ms;
// Repeat until we get all objects.
while (!unready.empty() && !should_break) {
std::vector<ObjectID> unready_ids;
for (const auto &entry : unready) {
unready_ids.push_back(entry.first);
}
// For the initial fetch, we only fetch the objects, do not reconstruct them.
bool fetch_only = num_attempts == 0;
if (!fetch_only) {
// If fetch_only is false, this worker will be blocked.
was_blocked = true;
}
// TODO: can call `fetchOrReconstruct` in batches as an optimization.
RAY_CHECK_OK(core_worker_.raylet_client_->FetchOrReconstruct(
unready_ids, fetch_only, core_worker_.worker_context_.GetCurrentTaskID()));
// Get the objects from the object store, and parse the result.
int64_t get_timeout;
if (remaining_timeout >= 0) {
get_timeout =
std::min(remaining_timeout, RayConfig::instance().get_timeout_milliseconds());
remaining_timeout -= get_timeout;
should_break = remaining_timeout <= 0;
} else {
get_timeout = RayConfig::instance().get_timeout_milliseconds();
}
std::vector<plasma::ObjectID> plasma_ids;
for (const auto &id : unready_ids) {
plasma_ids.push_back(id.ToPlasmaId());
}
std::vector<plasma::ObjectBuffer> object_buffers;
{
std::unique_lock<std::mutex> guard(core_worker_.store_client_mutex_);
auto status =
core_worker_.store_client_.Get(plasma_ids, get_timeout, &object_buffers);
}
for (size_t i = 0; i < object_buffers.size(); i++) {
if (object_buffers[i].data != nullptr) {
const auto &object_id = unready_ids[i];
(*results)[unready[object_id]] =
std::make_shared<PlasmaBuffer>(object_buffers[i].data);
unready.erase(object_id);
}
}
num_attempts += 1;
// TODO: log a message if attempted too many times.
}
if (was_blocked) {
RAY_CHECK_OK(core_worker_.raylet_client_->NotifyUnblocked(
core_worker_.worker_context_.GetCurrentTaskID()));
}
return Status::OK();
auto type = static_cast<int>(StoreProviderType::PLASMA);
return store_providers_[type]->Get(
ids, timeout_ms, core_worker_.worker_context_.GetCurrentTaskID(), results);
}
Status CoreWorkerObjectInterface::Wait(const std::vector<ObjectID> &object_ids,
int num_objects, int64_t timeout_ms,
std::vector<bool> *results) {
WaitResultPair result_pair;
auto status = core_worker_.raylet_client_->Wait(
object_ids, num_objects, timeout_ms, false,
core_worker_.worker_context_.GetCurrentTaskID(), &result_pair);
std::unordered_set<ObjectID> ready_ids;
for (const auto &entry : result_pair.first) {
ready_ids.insert(entry);
}
// TODO: change RayletClient::Wait() to return a bit set, so that we don't need
// to do this translation.
(*results).resize(object_ids.size());
for (size_t i = 0; i < object_ids.size(); i++) {
(*results)[i] = ready_ids.count(object_ids[i]) > 0;
}
return status;
auto type = static_cast<int>(StoreProviderType::PLASMA);
return store_providers_[type]->Wait(object_ids, num_objects, timeout_ms,
core_worker_.worker_context_.GetCurrentTaskID(),
results);
}
Status CoreWorkerObjectInterface::Delete(const std::vector<ObjectID> &object_ids,
bool local_only, bool delete_creating_tasks) {
return core_worker_.raylet_client_->FreeObjects(object_ids, local_only,
delete_creating_tasks);
auto type = static_cast<int>(StoreProviderType::PLASMA);
return store_providers_[type]->Delete(object_ids, local_only, delete_creating_tasks);
}
} // namespace ray
+5
View File
@@ -6,10 +6,12 @@
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/store_provider/store_provider.h"
namespace ray {
class CoreWorker;
class CoreWorkerStoreProvider;
/// The interface that contains all `CoreWorker` methods that are related to object store.
class CoreWorkerObjectInterface {
@@ -62,6 +64,9 @@ class CoreWorkerObjectInterface {
private:
/// Reference to the parent CoreWorker instance.
CoreWorker &core_worker_;
/// All the store providers supported.
std::unordered_map<int, std::unique_ptr<CoreWorkerStoreProvider>> store_providers_;
};
} // namespace ray
@@ -0,0 +1,139 @@
#include "ray/core_worker/store_provider/plasma_store_provider.h"
#include "ray/common/ray_config.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/core_worker.h"
#include "ray/core_worker/object_interface.h"
namespace ray {
CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(
plasma::PlasmaClient &store_client, std::mutex &store_client_mutex,
RayletClient &raylet_client)
: store_client_(store_client),
store_client_mutex_(store_client_mutex),
raylet_client_(raylet_client) {}
Status CoreWorkerPlasmaStoreProvider::Put(const Buffer &buffer,
const ObjectID &object_id) {
auto plasma_id = object_id.ToPlasmaId();
std::shared_ptr<arrow::Buffer> data;
{
std::unique_lock<std::mutex> guard(store_client_mutex_);
RAY_ARROW_RETURN_NOT_OK(
store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data));
}
memcpy(data->mutable_data(), buffer.Data(), buffer.Size());
{
std::unique_lock<std::mutex> guard(store_client_mutex_);
RAY_ARROW_RETURN_NOT_OK(store_client_.Seal(plasma_id));
RAY_ARROW_RETURN_NOT_OK(store_client_.Release(plasma_id));
}
return Status::OK();
}
Status CoreWorkerPlasmaStoreProvider::Get(const std::vector<ObjectID> &ids,
int64_t timeout_ms, const TaskID &task_id,
std::vector<std::shared_ptr<Buffer>> *results) {
(*results).resize(ids.size(), nullptr);
bool was_blocked = false;
std::unordered_map<ObjectID, int> unready;
for (size_t i = 0; i < ids.size(); i++) {
unready.insert({ids[i], i});
}
int num_attempts = 0;
bool should_break = false;
int64_t remaining_timeout = timeout_ms;
// Repeat until we get all objects.
while (!unready.empty() && !should_break) {
std::vector<ObjectID> unready_ids;
for (const auto &entry : unready) {
unready_ids.push_back(entry.first);
}
// For the initial fetch, we only fetch the objects, do not reconstruct them.
bool fetch_only = num_attempts == 0;
if (!fetch_only) {
// If fetch_only is false, this worker will be blocked.
was_blocked = true;
}
// TODO(zhijunfu): can call `fetchOrReconstruct` in batches as an optimization.
RAY_CHECK_OK(raylet_client_.FetchOrReconstruct(unready_ids, fetch_only, task_id));
// Get the objects from the object store, and parse the result.
int64_t get_timeout;
if (remaining_timeout >= 0) {
get_timeout =
std::min(remaining_timeout, RayConfig::instance().get_timeout_milliseconds());
remaining_timeout -= get_timeout;
should_break = remaining_timeout <= 0;
} else {
get_timeout = RayConfig::instance().get_timeout_milliseconds();
}
std::vector<plasma::ObjectID> plasma_ids;
for (const auto &id : unready_ids) {
plasma_ids.push_back(id.ToPlasmaId());
}
std::vector<plasma::ObjectBuffer> object_buffers;
{
std::unique_lock<std::mutex> guard(store_client_mutex_);
auto status = store_client_.Get(plasma_ids, get_timeout, &object_buffers);
}
for (size_t i = 0; i < object_buffers.size(); i++) {
if (object_buffers[i].data != nullptr) {
const auto &object_id = unready_ids[i];
(*results)[unready[object_id]] =
std::make_shared<PlasmaBuffer>(object_buffers[i].data);
unready.erase(object_id);
}
}
num_attempts += 1;
// TODO(zhijunfu): log a message if attempted too many times.
}
if (was_blocked) {
RAY_CHECK_OK(raylet_client_.NotifyUnblocked(task_id));
}
return Status::OK();
}
Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector<ObjectID> &object_ids,
int num_objects, int64_t timeout_ms,
const TaskID &task_id,
std::vector<bool> *results) {
WaitResultPair result_pair;
auto status = raylet_client_.Wait(object_ids, num_objects, timeout_ms, false, task_id,
&result_pair);
std::unordered_set<ObjectID> ready_ids;
for (const auto &entry : result_pair.first) {
ready_ids.insert(entry);
}
// TODO(zhijunfu): change RayletClient::Wait() to return a bit set, so that we don't
// need
// to do this translation.
(*results).resize(object_ids.size());
for (size_t i = 0; i < object_ids.size(); i++) {
(*results)[i] = ready_ids.count(object_ids[i]) > 0;
}
return status;
}
Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector<ObjectID> &object_ids,
bool local_only,
bool delete_creating_tasks) {
return raylet_client_.FreeObjects(object_ids, local_only, delete_creating_tasks);
}
} // namespace ray
@@ -0,0 +1,76 @@
#ifndef RAY_CORE_WORKER_PLASMA_STORE_PROVIDER_H
#define RAY_CORE_WORKER_PLASMA_STORE_PROVIDER_H
#include "plasma/client.h"
#include "ray/common/buffer.h"
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/store_provider/store_provider.h"
#include "ray/raylet/raylet_client.h"
namespace ray {
class CoreWorker;
/// The class provides implementations for accessing plasma store, which includes both
/// local and remote store, remote access is done via raylet.
class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider {
public:
CoreWorkerPlasmaStoreProvider(plasma::PlasmaClient &store_client,
std::mutex &store_client_mutex,
RayletClient &raylet_client);
/// Put an object with specified ID into object store.
///
/// \param[in] buffer Data buffer of the object.
/// \param[in] object_id Object ID specified by user.
/// \return Status.
Status Put(const Buffer &buffer, const ObjectID &object_id) override;
/// Get a list of objects from the object store.
///
/// \param[in] ids IDs of the objects to get.
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative.
/// \param[in] task_id ID for the current task.
/// \param[out] results Result list of objects data.
/// \return Status.
Status Get(const std::vector<ObjectID> &ids, int64_t timeout_ms, const TaskID &task_id,
std::vector<std::shared_ptr<Buffer>> *results) override;
/// Wait for a list of objects to appear in the object store.
///
/// \param[in] IDs of the objects to wait for.
/// \param[in] num_returns Number of objects that should appear.
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative.
/// \param[in] task_id ID for the current task.
/// \param[out] results A bitset that indicates each object has appeared or not.
/// \return Status.
Status Wait(const std::vector<ObjectID> &object_ids, int num_objects,
int64_t timeout_ms, const TaskID &task_id,
std::vector<bool> *results) override;
/// Delete a list of objects from the object store.
///
/// \param[in] object_ids IDs of the objects to delete.
/// \param[in] local_only Whether only delete the objects in local node, or all nodes in
/// the cluster.
/// \param[in] delete_creating_tasks Whether also delete the tasks that
/// created these objects. \return Status.
Status Delete(const std::vector<ObjectID> &object_ids, bool local_only,
bool delete_creating_tasks) override;
private:
/// Plasma store client.
plasma::PlasmaClient &store_client_;
/// Mutex to protect store_client_.
std::mutex &store_client_mutex_;
/// Raylet client.
RayletClient &raylet_client_;
};
} // namespace ray
#endif // RAY_CORE_WORKER_PLASMA_STORE_PROVIDER_H
@@ -0,0 +1,64 @@
#ifndef RAY_CORE_WORKER_STORE_PROVIDER_H
#define RAY_CORE_WORKER_STORE_PROVIDER_H
#include "ray/common/buffer.h"
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/core_worker/common.h"
namespace ray {
/// Provider interface for store access. Store provider should inherit from this class and
/// provide implementions for the methods. The actual store provider may use a plasma
/// store or local memory store in worker process, or possibly other types of storage.
class CoreWorkerStoreProvider {
public:
CoreWorkerStoreProvider() {}
virtual ~CoreWorkerStoreProvider() {}
/// Put an object with specified ID into object store.
///
/// \param[in] buffer Data buffer of the object.
/// \param[in] object_id Object ID specified by user.
/// \return Status.
virtual Status Put(const Buffer &buffer, const ObjectID &object_id) = 0;
/// Get a list of objects from the object store.
///
/// \param[in] ids IDs of the objects to get.
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative.
/// \param[in] task_id ID for the current task.
/// \param[out] results Result list of objects data.
/// \return Status.
virtual Status Get(const std::vector<ObjectID> &ids, int64_t timeout_ms,
const TaskID &task_id,
std::vector<std::shared_ptr<Buffer>> *results) = 0;
/// Wait for a list of objects to appear in the object store.
///
/// \param[in] IDs of the objects to wait for.
/// \param[in] num_returns Number of objects that should appear.
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative.
/// \param[in] task_id ID for the current task.
/// \param[out] results A bitset that indicates each object has appeared or not.
/// \return Status.
virtual Status Wait(const std::vector<ObjectID> &object_ids, int num_objects,
int64_t timeout_ms, const TaskID &task_id,
std::vector<bool> *results) = 0;
/// Delete a list of objects from the object store.
///
/// \param[in] object_ids IDs of the objects to delete.
/// \param[in] local_only Whether only delete the objects in local node, or all nodes in
/// the cluster.
/// \param[in] delete_creating_tasks Whether also delete the tasks that
/// created these objects. \return Status.
virtual Status Delete(const std::vector<ObjectID> &object_ids, bool local_only,
bool delete_creating_tasks) = 0;
};
} // namespace ray
#endif // RAY_CORE_WORKER_STORE_PROVIDER_H
+34 -23
View File
@@ -1,43 +1,54 @@
#include "ray/core_worker/task_execution.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/core_worker.h"
#include "ray/core_worker/transport/raylet_transport.h"
namespace ray {
Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) {
RAY_CHECK(core_worker_.is_initialized_);
CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface(
CoreWorker &core_worker)
: core_worker_(core_worker) {
task_receivers.emplace(
static_cast<int>(TaskTransportType::RAYLET),
std::unique_ptr<CoreWorkerRayletTaskReceiver>(
new CoreWorkerRayletTaskReceiver(core_worker_.raylet_client_)));
}
Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) {
while (true) {
std::unique_ptr<raylet::TaskSpecification> task_spec;
auto status = core_worker_.raylet_client_->GetTask(&task_spec);
std::vector<TaskSpec> tasks;
auto status =
task_receivers[static_cast<int>(TaskTransportType::RAYLET)]->GetTasks(&tasks);
if (!status.ok()) {
RAY_LOG(ERROR) << "Get task failed with error: "
RAY_LOG(ERROR) << "Getting task failed with error: "
<< ray::Status::IOError(status.message());
return status;
}
const auto &spec = *task_spec;
core_worker_.worker_context_.SetCurrentTask(spec);
for (const auto &task : tasks) {
const auto &spec = task.GetTaskSpecification();
core_worker_.worker_context_.SetCurrentTask(spec);
WorkerLanguage language = (spec.GetLanguage() == ::Language::JAVA)
? WorkerLanguage::JAVA
: WorkerLanguage::PYTHON;
RayFunction func{language, spec.FunctionDescriptor()};
WorkerLanguage language = (spec.GetLanguage() == ::Language::JAVA)
? WorkerLanguage::JAVA
: WorkerLanguage::PYTHON;
RayFunction func{language, spec.FunctionDescriptor()};
std::vector<std::shared_ptr<Buffer>> args;
RAY_CHECK_OK(BuildArgsForExecutor(spec, &args));
std::vector<std::shared_ptr<Buffer>> args;
RAY_CHECK_OK(BuildArgsForExecutor(spec, &args));
auto num_returns = spec.NumReturns();
if (spec.IsActorCreationTask() || spec.IsActorTask()) {
RAY_CHECK(num_returns > 0);
// Decrease to account for the dummy object id.
num_returns--;
auto num_returns = spec.NumReturns();
if (spec.IsActorCreationTask() || spec.IsActorTask()) {
RAY_CHECK(num_returns > 0);
// Decrease to account for the dummy object id.
num_returns--;
}
status = executor(func, args, spec.TaskId(), num_returns);
// TODO(zhijunfu):
// 1. Check and handle failure.
// 2. Save or load checkpoint.
}
status = executor(func, args, spec.TaskId(), num_returns);
// TODO:
// 1. Check and handle failure.
// 2. Save or load checkpoint.
}
// should never reach here.
+5 -2
View File
@@ -4,6 +4,7 @@
#include "ray/common/buffer.h"
#include "ray/common/status.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/transport/transport.h"
namespace ray {
@@ -17,8 +18,7 @@ class TaskSpecification;
/// execution.
class CoreWorkerTaskExecutionInterface {
public:
CoreWorkerTaskExecutionInterface(CoreWorker &core_worker) : core_worker_(core_worker) {}
CoreWorkerTaskExecutionInterface(CoreWorker &core_worker);
/// The callback provided app-language workers that executes tasks.
///
/// \param ray_function[in] Information about the function to execute.
@@ -46,6 +46,9 @@ class CoreWorkerTaskExecutionInterface {
/// Reference to the parent CoreWorker instance.
CoreWorker &core_worker_;
/// All the task task receivers supported.
std::unordered_map<int, std::unique_ptr<CoreWorkerTaskReceiver>> task_receivers;
};
} // namespace ray
+20 -20
View File
@@ -1,9 +1,19 @@
#include "ray/core_worker/task_interface.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/core_worker.h"
#include "ray/core_worker/task_interface.h"
#include "ray/core_worker/transport/raylet_transport.h"
namespace ray {
CoreWorkerTaskInterface::CoreWorkerTaskInterface(CoreWorker &core_worker)
: core_worker_(core_worker) {
task_submitters_.emplace(
static_cast<int>(TaskTransportType::RAYLET),
std::unique_ptr<CoreWorkerRayletTaskSubmitter>(
new CoreWorkerRayletTaskSubmitter(core_worker_.raylet_client_)));
}
Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
@@ -20,7 +30,7 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function,
}
auto task_arguments = BuildTaskArguments(args);
auto language = ToTaskLanguage(function.language);
auto language = core_worker_.ToTaskLanguage(function.language);
ray::raylet::TaskSpecification spec(context.GetCurrentDriverID(),
context.GetCurrentTaskID(), next_task_index,
@@ -28,7 +38,8 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function,
language, function.function_descriptor);
std::vector<ObjectID> execution_dependencies;
return core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec);
TaskSpec task(std::move(spec), execution_dependencies);
return task_submitters_[static_cast<int>(TaskTransportType::RAYLET)]->SubmitTask(task);
}
Status CoreWorkerTaskInterface::CreateActor(
@@ -50,7 +61,7 @@ Status CoreWorkerTaskInterface::CreateActor(
(*actor_handle)->SetActorCursor(return_ids[0]);
auto task_arguments = BuildTaskArguments(args);
auto language = ToTaskLanguage(function.language);
auto language = core_worker_.ToTaskLanguage(function.language);
// Note that the caller is supposed to specify required placement resources
// correctly via actor_creation_options.resources.
@@ -62,7 +73,8 @@ Status CoreWorkerTaskInterface::CreateActor(
function.function_descriptor);
std::vector<ObjectID> execution_dependencies;
return core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec);
TaskSpec task(std::move(spec), execution_dependencies);
return task_submitters_[static_cast<int>(TaskTransportType::RAYLET)]->SubmitTask(task);
}
Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
@@ -86,7 +98,7 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
ObjectID::FromBinary(actor_handle.ActorID().Binary());
auto task_arguments = BuildTaskArguments(args);
auto language = ToTaskLanguage(function.language);
auto language = core_worker_.ToTaskLanguage(function.language);
std::vector<ActorHandleID> new_actor_handles;
ray::raylet::TaskSpecification spec(
@@ -103,7 +115,9 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
actor_handle.SetActorCursor(actor_cursor);
actor_handle.ClearNewActorHandles();
auto status = core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec);
TaskSpec task(std::move(spec), execution_dependencies);
auto status =
task_submitters_[static_cast<int>(TaskTransportType::RAYLET)]->SubmitTask(task);
// remove cursor from return ids.
(*return_ids).pop_back();
@@ -127,18 +141,4 @@ CoreWorkerTaskInterface::BuildTaskArguments(const std::vector<TaskArg> &args) {
return task_arguments;
}
::Language CoreWorkerTaskInterface::ToTaskLanguage(WorkerLanguage language) {
switch (language) {
case ray::WorkerLanguage::JAVA:
return ::Language::JAVA;
break;
case ray::WorkerLanguage::PYTHON:
return ::Language::PYTHON;
break;
default:
RAY_LOG(FATAL) << "invalid language specified: " << static_cast<int>(language);
break;
}
}
} // namespace ray
+6 -8
View File
@@ -7,6 +7,7 @@
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/transport/transport.h"
#include "ray/raylet/task.h"
namespace ray {
@@ -65,11 +66,11 @@ class ActorHandle {
int IncreaseTaskCounter() { return task_counter_++; }
std::list<ray::ActorHandleID> GetNewActorHandle() {
// TODO: implement this.
// TODO(zhijunfu): implement this.
return std::list<ray::ActorHandleID>();
}
void ClearNewActorHandles() { /* TODO: implement this. */
void ClearNewActorHandles() { /* TODO(zhijunfu): implement this. */
}
private:
@@ -89,7 +90,7 @@ class ActorHandle {
/// submission.
class CoreWorkerTaskInterface {
public:
CoreWorkerTaskInterface(CoreWorker &core_worker) : core_worker_(core_worker) {}
CoreWorkerTaskInterface(CoreWorker &core_worker);
/// Submit a normal task.
///
@@ -137,11 +138,8 @@ class CoreWorkerTaskInterface {
std::vector<std::shared_ptr<raylet::TaskArgument>> BuildTaskArguments(
const std::vector<TaskArg> &args);
/// Translate from WorkLanguage to Language type (required by taks spec).
///
/// \param[in] language Language for a task.
/// \return Translated task language.
::Language ToTaskLanguage(WorkerLanguage language);
/// All the task submitters supported.
std::unordered_map<int, std::unique_ptr<CoreWorkerTaskSubmitter>> task_submitters_;
};
} // namespace ray
@@ -0,0 +1,32 @@
#include "ray/core_worker/transport/raylet_transport.h"
namespace ray {
CoreWorkerRayletTaskSubmitter::CoreWorkerRayletTaskSubmitter(RayletClient &raylet_client)
: raylet_client_(raylet_client) {}
Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpec &task) {
return raylet_client_.SubmitTask(task.GetDependencies(), task.GetTaskSpecification());
}
CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver(RayletClient &raylet_client)
: raylet_client_(raylet_client) {}
Status CoreWorkerRayletTaskReceiver::GetTasks(std::vector<TaskSpec> *tasks) {
std::unique_ptr<raylet::TaskSpecification> task_spec;
auto status = raylet_client_.GetTask(&task_spec);
if (!status.ok()) {
RAY_LOG(ERROR) << "Get task from raylet failed with error: "
<< ray::Status::IOError(status.message());
return status;
}
std::vector<ObjectID> dependencies;
RAY_CHECK((*tasks).empty());
(*tasks).emplace_back(*task_spec, dependencies);
return Status::OK();
}
} // namespace ray
@@ -0,0 +1,44 @@
#ifndef RAY_CORE_WORKER_RAYLET_TRANSPORT_H
#define RAY_CORE_WORKER_RAYLET_TRANSPORT_H
#include <list>
#include "ray/core_worker/transport/transport.h"
#include "ray/raylet/raylet_client.h"
namespace ray {
/// In raylet task submitter and receiver, a task is submitted to raylet, and possibly
/// gets forwarded to another raylet on which node the task should be executed, and
/// then a worker on that node gets this task and starts executing it.
class CoreWorkerRayletTaskSubmitter : public CoreWorkerTaskSubmitter {
public:
CoreWorkerRayletTaskSubmitter(RayletClient &raylet_client);
/// Submit a task for execution to raylet.
///
/// \param[in] task The task spec to submit.
/// \return Status.
virtual Status SubmitTask(const TaskSpec &task) override;
private:
/// Raylet client.
RayletClient &raylet_client_;
};
class CoreWorkerRayletTaskReceiver : public CoreWorkerTaskReceiver {
public:
CoreWorkerRayletTaskReceiver(RayletClient &raylet_client);
// Get tasks for execution from raylet.
virtual Status GetTasks(std::vector<TaskSpec> *tasks) override;
private:
/// Raylet client.
RayletClient &raylet_client_;
};
} // namespace ray
#endif // RAY_CORE_WORKER_RAYLET_TRANSPORT_H
+41
View File
@@ -0,0 +1,41 @@
#ifndef RAY_CORE_WORKER_TRANSPORT_H
#define RAY_CORE_WORKER_TRANSPORT_H
#include <list>
#include "ray/common/buffer.h"
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/core_worker/common.h"
#include "ray/raylet/task_spec.h"
namespace ray {
/// Interfaces for task submitter and receiver. They are separate classes but should be
/// used in pairs - one type of task submitter should be used together with task
/// with the same type, so these classes are put together in this same file.
///
/// Task submitter/receiver should inherit from these classes and provide implementions
/// for the methods. The actual task submitter/receiver can submit/get tasks via raylet,
/// or directly to/from another worker.
/// This class is responsible to submit tasks.
class CoreWorkerTaskSubmitter {
public:
/// Submit a task for execution.
///
/// \param[in] task The task spec to submit.
/// \return Status.
virtual Status SubmitTask(const TaskSpec &task) = 0;
};
/// This class receives tasks for execution.
class CoreWorkerTaskReceiver {
public:
// Get tasks for execution.
virtual Status GetTasks(std::vector<TaskSpec> *tasks) = 0;
};
} // namespace ray
#endif // RAY_CORE_WORKER_TRANSPORT_H
-3
View File
@@ -43,6 +43,3 @@ sleep 1s
bazel run //:redis-cli -- -p 6379 shutdown
bazel run //:redis-cli -- -p 6380 shutdown
sleep 1s
# Include raylet integration test once it's ready.
# ./bazel-bin/object_manager_integration_test $STORE_EXEC