[Core worker] Add metadata support in object interface (#5031)

This commit is contained in:
Kai Yang
2019-06-29 02:35:03 +08:00
committed by Philipp Moritz
parent cefbb0c94c
commit 4ccb7b05cc
9 changed files with 112 additions and 63 deletions
+37 -24
View File
@@ -145,12 +145,13 @@ class CoreWorkerTest : public ::testing::Test {
ASSERT_EQ(return_ids.size(), 1);
std::vector<std::shared_ptr<ray::Buffer>> results;
std::vector<std::shared_ptr<ray::RayObject>> results;
RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results));
ASSERT_EQ(results.size(), 1);
ASSERT_EQ(results[0]->Size(), buffer1->Size());
ASSERT_EQ(memcmp(results[0]->Data(), buffer1->Data(), buffer1->Size()), 0);
ASSERT_EQ(results[0]->GetData()->Size(), buffer1->Size());
ASSERT_EQ(memcmp(results[0]->GetData()->Data(), buffer1->Data(), buffer1->Size()),
0);
}
// Test pass by reference.
@@ -159,7 +160,7 @@ class CoreWorkerTest : public ::testing::Test {
auto buffer1 = std::make_shared<LocalMemoryBuffer>(array1, sizeof(array1));
ObjectID object_id;
RAY_CHECK_OK(driver.Objects().Put(*buffer1, &object_id));
RAY_CHECK_OK(driver.Objects().Put(RayObject(buffer1, nullptr), &object_id));
std::vector<TaskArg> args;
args.emplace_back(TaskArg::PassByReference(object_id));
@@ -172,12 +173,13 @@ class CoreWorkerTest : public ::testing::Test {
ASSERT_EQ(return_ids.size(), 1);
std::vector<std::shared_ptr<ray::Buffer>> results;
std::vector<std::shared_ptr<ray::RayObject>> results;
RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results));
ASSERT_EQ(results.size(), 1);
ASSERT_EQ(results[0]->Size(), buffer1->Size());
ASSERT_EQ(memcmp(results[0]->Data(), buffer1->Data(), buffer1->Size()), 0);
ASSERT_EQ(results[0]->GetData()->Size(), buffer1->Size());
ASSERT_EQ(memcmp(results[0]->GetData()->Data(), buffer1->Data(), buffer1->Size()),
0);
}
}
@@ -212,7 +214,7 @@ class CoreWorkerTest : public ::testing::Test {
auto buffer2 = std::make_shared<LocalMemoryBuffer>(array2, sizeof(array2));
ObjectID object_id;
RAY_CHECK_OK(driver.Objects().Put(*buffer1, &object_id));
RAY_CHECK_OK(driver.Objects().Put(RayObject(buffer1, nullptr), &object_id));
// Create arguments with PassByRef and PassByValue.
std::vector<TaskArg> args;
@@ -226,15 +228,16 @@ class CoreWorkerTest : public ::testing::Test {
&return_ids));
RAY_CHECK(return_ids.size() == 1);
std::vector<std::shared_ptr<ray::Buffer>> results;
std::vector<std::shared_ptr<ray::RayObject>> results;
RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results));
ASSERT_EQ(results.size(), 1);
ASSERT_EQ(results[0]->Size(), buffer1->Size() + buffer2->Size());
ASSERT_EQ(memcmp(results[0]->Data(), buffer1->Data(), buffer1->Size()), 0);
ASSERT_EQ(
memcmp(results[0]->Data() + buffer1->Size(), buffer2->Data(), buffer2->Size()),
0);
ASSERT_EQ(results[0]->GetData()->Size(), buffer1->Size() + buffer2->Size());
ASSERT_EQ(memcmp(results[0]->GetData()->Data(), buffer1->Data(), buffer1->Size()),
0);
ASSERT_EQ(memcmp(results[0]->GetData()->Data() + buffer1->Size(), buffer2->Data(),
buffer2->Size()),
0);
}
}
@@ -307,9 +310,11 @@ TEST_F(SingleNodeTest, TestObjectInterface) {
uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8};
uint8_t array2[] = {10, 11, 12, 13, 14, 15};
std::vector<LocalMemoryBuffer> buffers;
buffers.emplace_back(array1, sizeof(array1));
buffers.emplace_back(array2, sizeof(array2));
std::vector<RayObject> buffers;
buffers.emplace_back(std::make_shared<LocalMemoryBuffer>(array1, sizeof(array1)),
std::make_shared<LocalMemoryBuffer>(array1, sizeof(array1) / 2));
buffers.emplace_back(std::make_shared<LocalMemoryBuffer>(array2, sizeof(array2)),
std::make_shared<LocalMemoryBuffer>(array2, sizeof(array2) / 2));
std::vector<ObjectID> ids(buffers.size());
for (size_t i = 0; i < ids.size(); i++) {
@@ -317,13 +322,19 @@ TEST_F(SingleNodeTest, TestObjectInterface) {
}
// Test Get().
std::vector<std::shared_ptr<Buffer>> results;
std::vector<std::shared_ptr<RayObject>> results;
RAY_CHECK_OK(core_worker.Objects().Get(ids, -1, &results));
ASSERT_EQ(results.size(), 2);
for (size_t i = 0; i < ids.size(); i++) {
ASSERT_EQ(results[i]->Size(), buffers[i].Size());
ASSERT_EQ(memcmp(results[i]->Data(), buffers[i].Data(), buffers[i].Size()), 0);
ASSERT_EQ(results[i]->GetData()->Size(), buffers[i].GetData()->Size());
ASSERT_EQ(memcmp(results[i]->GetData()->Data(), buffers[i].GetData()->Data(),
buffers[i].GetData()->Size()),
0);
ASSERT_EQ(results[i]->GetMetadata()->Size(), buffers[i].GetMetadata()->Size());
ASSERT_EQ(memcmp(results[i]->GetMetadata()->Data(), buffers[i].GetMetadata()->Data(),
buffers[i].GetMetadata()->Size()),
0);
}
// Test Wait().
@@ -373,17 +384,19 @@ TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) {
std::vector<ObjectID> ids(buffers.size());
for (size_t i = 0; i < ids.size(); i++) {
RAY_CHECK_OK(worker1.Objects().Put(buffers[i], &ids[i]));
RAY_CHECK_OK(worker1.Objects().Put(
RayObject(std::make_shared<LocalMemoryBuffer>(buffers[i]), nullptr), &ids[i]));
}
// Test Get() from remote node.
std::vector<std::shared_ptr<Buffer>> results;
std::vector<std::shared_ptr<RayObject>> results;
RAY_CHECK_OK(worker2.Objects().Get(ids, -1, &results));
ASSERT_EQ(results.size(), 2);
for (size_t i = 0; i < ids.size(); i++) {
ASSERT_EQ(results[i]->Size(), buffers[i].Size());
ASSERT_EQ(memcmp(results[i]->Data(), buffers[i].Data(), buffers[i].Size()), 0);
ASSERT_EQ(results[i]->GetData()->Size(), buffers[i].Size());
ASSERT_EQ(memcmp(results[i]->GetData()->Data(), buffers[i].Data(), buffers[i].Size()),
0);
}
// Test Wait() from remote node.
+7 -4
View File
@@ -1,5 +1,6 @@
#include "ray/core_worker/context.h"
#include "ray/core_worker/core_worker.h"
#include "ray/core_worker/store_provider/store_provider.h"
#include "ray/core_worker/task_execution.h"
namespace ray {
@@ -21,7 +22,7 @@ class MockWorker {
void Run() {
auto executor_func = [this](const RayFunction &ray_function,
const std::vector<std::shared_ptr<Buffer>> &args,
const std::vector<std::shared_ptr<RayObject>> &args,
const TaskInfo &task_info, int num_returns) {
// Note that this doesn't include dummy object id.
RAY_CHECK(num_returns >= 0);
@@ -29,15 +30,17 @@ class MockWorker {
// Merge all the content from input args.
std::vector<uint8_t> buffer;
for (const auto &arg : args) {
buffer.insert(buffer.end(), arg->Data(), arg->Data() + arg->Size());
auto &data = arg->GetData();
buffer.insert(buffer.end(), data->Data(), data->Data() + data->Size());
}
LocalMemoryBuffer memory_buffer(buffer.data(), buffer.size());
auto return_value = RayObject(
std::make_shared<LocalMemoryBuffer>(buffer.data(), buffer.size()), nullptr);
// Write the merged content to each of return ids.
for (int i = 0; i < num_returns; i++) {
ObjectID id = ObjectID::ForTaskReturn(task_info.task_id, i + 1);
RAY_CHECK_OK(worker_.Objects().Put(memory_buffer, id));
RAY_CHECK_OK(worker_.Objects().Put(return_value, id));
}
return Status::OK();
};
+6 -5
View File
@@ -15,21 +15,22 @@ CoreWorkerObjectInterface::CoreWorkerObjectInterface(CoreWorker &core_worker)
core_worker_.raylet_client_)));
}
Status CoreWorkerObjectInterface::Put(const Buffer &buffer, ObjectID *object_id) {
Status CoreWorkerObjectInterface::Put(const RayObject &object, ObjectID *object_id) {
ObjectID put_id = ObjectID::ForPut(core_worker_.worker_context_.GetCurrentTaskID(),
core_worker_.worker_context_.GetNextPutIndex());
*object_id = put_id;
return Put(buffer, put_id);
return Put(object, put_id);
}
Status CoreWorkerObjectInterface::Put(const Buffer &buffer, const ObjectID &object_id) {
Status CoreWorkerObjectInterface::Put(const RayObject &object,
const ObjectID &object_id) {
auto type = static_cast<int>(StoreProviderType::PLASMA);
return store_providers_[type]->Put(buffer, object_id);
return store_providers_[type]->Put(object, object_id);
}
Status CoreWorkerObjectInterface::Get(const std::vector<ObjectID> &ids,
int64_t timeout_ms,
std::vector<std::shared_ptr<Buffer>> *results) {
std::vector<std::shared_ptr<RayObject>> *results) {
auto type = static_cast<int>(StoreProviderType::PLASMA);
return store_providers_[type]->Get(
ids, timeout_ms, core_worker_.worker_context_.GetCurrentTaskID(), results);
+5 -5
View File
@@ -20,17 +20,17 @@ class CoreWorkerObjectInterface {
/// Put an object into object store.
///
/// \param[in] buffer Data buffer of the object.
/// \param[in] object The ray object.
/// \param[out] object_id Generated ID of the object.
/// \return Status.
Status Put(const Buffer &buffer, ObjectID *object_id);
Status Put(const RayObject &object, ObjectID *object_id);
/// Put an object with specified ID into object store.
///
/// \param[in] buffer Data buffer of the object.
/// \param[in] object The ray object.
/// \param[in] object_id Object ID specified by user.
/// \return Status.
Status Put(const Buffer &buffer, const ObjectID &object_id);
Status Put(const RayObject &object, const ObjectID &object_id);
/// Get a list of objects from the object store.
///
@@ -39,7 +39,7 @@ class CoreWorkerObjectInterface {
/// \param[out] results Result list of objects data.
/// \return Status.
Status Get(const std::vector<ObjectID> &ids, int64_t timeout_ms,
std::vector<std::shared_ptr<Buffer>> *results);
std::vector<std::shared_ptr<RayObject>> *results);
/// Wait for a list of objects to appear in the object store.
///
@@ -13,17 +13,20 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(
store_client_mutex_(store_client_mutex),
raylet_client_(raylet_client) {}
Status CoreWorkerPlasmaStoreProvider::Put(const Buffer &buffer,
Status CoreWorkerPlasmaStoreProvider::Put(const RayObject &object,
const ObjectID &object_id) {
auto plasma_id = object_id.ToPlasmaId();
std::shared_ptr<arrow::Buffer> data;
auto data = object.GetData();
auto metadata = object.GetMetadata();
std::shared_ptr<arrow::Buffer> out_buffer;
{
std::unique_lock<std::mutex> guard(store_client_mutex_);
RAY_ARROW_RETURN_NOT_OK(
store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data));
RAY_ARROW_RETURN_NOT_OK(store_client_.Create(
plasma_id, data->Size(), metadata ? metadata->Data() : nullptr,
metadata ? metadata->Size() : 0, &out_buffer));
}
memcpy(data->mutable_data(), buffer.Data(), buffer.Size());
memcpy(out_buffer->mutable_data(), data->Data(), data->Size());
{
std::unique_lock<std::mutex> guard(store_client_mutex_);
@@ -33,9 +36,9 @@ Status CoreWorkerPlasmaStoreProvider::Put(const Buffer &buffer,
return Status::OK();
}
Status CoreWorkerPlasmaStoreProvider::Get(const std::vector<ObjectID> &ids,
int64_t timeout_ms, const TaskID &task_id,
std::vector<std::shared_ptr<Buffer>> *results) {
Status CoreWorkerPlasmaStoreProvider::Get(
const std::vector<ObjectID> &ids, int64_t timeout_ms, const TaskID &task_id,
std::vector<std::shared_ptr<RayObject>> *results) {
(*results).resize(ids.size(), nullptr);
bool was_blocked = false;
@@ -90,8 +93,9 @@ Status CoreWorkerPlasmaStoreProvider::Get(const std::vector<ObjectID> &ids,
for (size_t i = 0; i < object_buffers.size(); i++) {
if (object_buffers[i].data != nullptr) {
const auto &object_id = unready_ids[i];
(*results)[unready[object_id]] =
std::make_shared<PlasmaBuffer>(object_buffers[i].data);
(*results)[unready[object_id]] = std::make_shared<RayObject>(
std::make_shared<PlasmaBuffer>(object_buffers[i].data),
std::make_shared<PlasmaBuffer>(object_buffers[i].metadata));
unready.erase(object_id);
}
}
@@ -23,10 +23,10 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider {
/// Put an object with specified ID into object store.
///
/// \param[in] buffer Data buffer of the object.
/// \param[in] object The ray object.
/// \param[in] object_id Object ID specified by user.
/// \return Status.
Status Put(const Buffer &buffer, const ObjectID &object_id) override;
Status Put(const RayObject &object, const ObjectID &object_id) override;
/// Get a list of objects from the object store.
///
@@ -36,7 +36,7 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider {
/// \param[out] results Result list of objects data.
/// \return Status.
Status Get(const std::vector<ObjectID> &ids, int64_t timeout_ms, const TaskID &task_id,
std::vector<std::shared_ptr<Buffer>> *results) override;
std::vector<std::shared_ptr<RayObject>> *results) override;
/// Wait for a list of objects to appear in the object store.
///
@@ -8,6 +8,29 @@
namespace ray {
/// Binary representation of ray object.
class RayObject {
public:
/// Create a ray object instance.
///
/// \param[in] data Data of the ray object.
/// \param[in] metadata Metadata of the ray object.
RayObject(const std::shared_ptr<Buffer> &data, const std::shared_ptr<Buffer> &metadata)
: data_(data), metadata_(metadata) {}
/// Return the data of the ray object.
const std::shared_ptr<Buffer> &GetData() const { return data_; };
/// Return the metadata of the ray object.
const std::shared_ptr<Buffer> &GetMetadata() const { return metadata_; };
private:
/// Data of the ray object.
const std::shared_ptr<Buffer> data_;
/// Metadata of the ray object.
const std::shared_ptr<Buffer> metadata_;
};
/// Provider interface for store access. Store provider should inherit from this class and
/// provide implementions for the methods. The actual store provider may use a plasma
/// store or local memory store in worker process, or possibly other types of storage.
@@ -20,10 +43,10 @@ class CoreWorkerStoreProvider {
/// Put an object with specified ID into object store.
///
/// \param[in] buffer Data buffer of the object.
/// \param[in] object The ray object.
/// \param[in] object_id Object ID specified by user.
/// \return Status.
virtual Status Put(const Buffer &buffer, const ObjectID &object_id) = 0;
virtual Status Put(const RayObject &object, const ObjectID &object_id) = 0;
/// Get a list of objects from the object store.
///
@@ -34,7 +57,7 @@ class CoreWorkerStoreProvider {
/// \return Status.
virtual Status Get(const std::vector<ObjectID> &ids, int64_t timeout_ms,
const TaskID &task_id,
std::vector<std::shared_ptr<Buffer>> *results) = 0;
std::vector<std::shared_ptr<RayObject>> *results) = 0;
/// Wait for a list of objects to appear in the object store.
///
+8 -5
View File
@@ -34,7 +34,7 @@ Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) {
: WorkerLanguage::PYTHON;
RayFunction func{language, spec.FunctionDescriptor()};
std::vector<std::shared_ptr<Buffer>> args;
std::vector<std::shared_ptr<RayObject>> args;
RAY_CHECK_OK(BuildArgsForExecutor(spec, &args));
TaskType task_type;
@@ -67,7 +67,8 @@ Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) {
}
Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor(
const raylet::TaskSpecification &spec, std::vector<std::shared_ptr<Buffer>> *args) {
const raylet::TaskSpecification &spec,
std::vector<std::shared_ptr<RayObject>> *args) {
auto num_args = spec.NumArgs();
(*args).resize(num_args);
@@ -83,12 +84,14 @@ Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor(
indices.push_back(i);
} else {
// pass by value.
(*args)[i] = std::make_shared<LocalMemoryBuffer>(
const_cast<uint8_t *>(spec.ArgVal(i)), spec.ArgValLength(i));
(*args)[i] = std::make_shared<RayObject>(
std::make_shared<LocalMemoryBuffer>(const_cast<uint8_t *>(spec.ArgVal(i)),
spec.ArgValLength(i)),
nullptr);
}
}
std::vector<std::shared_ptr<Buffer>> results;
std::vector<std::shared_ptr<RayObject>> results;
auto status = core_worker_.object_interface_.Get(object_ids_to_fetch, -1, &results);
if (status.ok()) {
for (size_t i = 0; i < results.size(); i++) {
+6 -4
View File
@@ -4,6 +4,7 @@
#include "ray/common/buffer.h"
#include "ray/common/status.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/store_provider/store_provider.h"
#include "ray/core_worker/transport/transport.h"
namespace ray {
@@ -24,9 +25,10 @@ class CoreWorkerTaskExecutionInterface {
/// \param ray_function[in] Information about the function to execute.
/// \param args[in] Arguments of the task.
/// \return Status.
using TaskExecutor = std::function<Status(
const RayFunction &ray_function, const std::vector<std::shared_ptr<Buffer>> &args,
const TaskInfo &task_info, int num_returns)>;
using TaskExecutor =
std::function<Status(const RayFunction &ray_function,
const std::vector<std::shared_ptr<RayObject>> &args,
const TaskInfo &task_info, int num_returns)>;
/// Start receving and executes tasks in a infinite loop.
/// \return Status.
@@ -42,7 +44,7 @@ class CoreWorkerTaskExecutionInterface {
/// \param args[out] The arguments for passing to task executor.
///
Status BuildArgsForExecutor(const raylet::TaskSpecification &spec,
std::vector<std::shared_ptr<Buffer>> *args);
std::vector<std::shared_ptr<RayObject>> *args);
/// Reference to the parent CoreWorker instance.
CoreWorker &core_worker_;