From 4ccb7b05cc3a7db24e0615dc69ec36ddfecaea5d Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Sat, 29 Jun 2019 02:35:03 +0800 Subject: [PATCH] [Core worker] Add metadata support in object interface (#5031) --- src/ray/core_worker/core_worker_test.cc | 61 +++++++++++-------- src/ray/core_worker/mock_worker.cc | 11 ++-- src/ray/core_worker/object_interface.cc | 11 ++-- src/ray/core_worker/object_interface.h | 10 +-- .../store_provider/plasma_store_provider.cc | 24 +++++--- .../store_provider/plasma_store_provider.h | 6 +- .../store_provider/store_provider.h | 29 ++++++++- src/ray/core_worker/task_execution.cc | 13 ++-- src/ray/core_worker/task_execution.h | 10 +-- 9 files changed, 112 insertions(+), 63 deletions(-) diff --git a/src/ray/core_worker/core_worker_test.cc b/src/ray/core_worker/core_worker_test.cc index 6e866fb00..773e3f26d 100644 --- a/src/ray/core_worker/core_worker_test.cc +++ b/src/ray/core_worker/core_worker_test.cc @@ -145,12 +145,13 @@ class CoreWorkerTest : public ::testing::Test { ASSERT_EQ(return_ids.size(), 1); - std::vector> results; + std::vector> results; RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results)); ASSERT_EQ(results.size(), 1); - ASSERT_EQ(results[0]->Size(), buffer1->Size()); - ASSERT_EQ(memcmp(results[0]->Data(), buffer1->Data(), buffer1->Size()), 0); + ASSERT_EQ(results[0]->GetData()->Size(), buffer1->Size()); + ASSERT_EQ(memcmp(results[0]->GetData()->Data(), buffer1->Data(), buffer1->Size()), + 0); } // Test pass by reference. @@ -159,7 +160,7 @@ class CoreWorkerTest : public ::testing::Test { auto buffer1 = std::make_shared(array1, sizeof(array1)); ObjectID object_id; - RAY_CHECK_OK(driver.Objects().Put(*buffer1, &object_id)); + RAY_CHECK_OK(driver.Objects().Put(RayObject(buffer1, nullptr), &object_id)); std::vector args; args.emplace_back(TaskArg::PassByReference(object_id)); @@ -172,12 +173,13 @@ class CoreWorkerTest : public ::testing::Test { ASSERT_EQ(return_ids.size(), 1); - std::vector> results; + std::vector> results; RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results)); ASSERT_EQ(results.size(), 1); - ASSERT_EQ(results[0]->Size(), buffer1->Size()); - ASSERT_EQ(memcmp(results[0]->Data(), buffer1->Data(), buffer1->Size()), 0); + ASSERT_EQ(results[0]->GetData()->Size(), buffer1->Size()); + ASSERT_EQ(memcmp(results[0]->GetData()->Data(), buffer1->Data(), buffer1->Size()), + 0); } } @@ -212,7 +214,7 @@ class CoreWorkerTest : public ::testing::Test { auto buffer2 = std::make_shared(array2, sizeof(array2)); ObjectID object_id; - RAY_CHECK_OK(driver.Objects().Put(*buffer1, &object_id)); + RAY_CHECK_OK(driver.Objects().Put(RayObject(buffer1, nullptr), &object_id)); // Create arguments with PassByRef and PassByValue. std::vector args; @@ -226,15 +228,16 @@ class CoreWorkerTest : public ::testing::Test { &return_ids)); RAY_CHECK(return_ids.size() == 1); - std::vector> results; + std::vector> results; RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results)); ASSERT_EQ(results.size(), 1); - ASSERT_EQ(results[0]->Size(), buffer1->Size() + buffer2->Size()); - ASSERT_EQ(memcmp(results[0]->Data(), buffer1->Data(), buffer1->Size()), 0); - ASSERT_EQ( - memcmp(results[0]->Data() + buffer1->Size(), buffer2->Data(), buffer2->Size()), - 0); + ASSERT_EQ(results[0]->GetData()->Size(), buffer1->Size() + buffer2->Size()); + ASSERT_EQ(memcmp(results[0]->GetData()->Data(), buffer1->Data(), buffer1->Size()), + 0); + ASSERT_EQ(memcmp(results[0]->GetData()->Data() + buffer1->Size(), buffer2->Data(), + buffer2->Size()), + 0); } } @@ -307,9 +310,11 @@ TEST_F(SingleNodeTest, TestObjectInterface) { uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; uint8_t array2[] = {10, 11, 12, 13, 14, 15}; - std::vector buffers; - buffers.emplace_back(array1, sizeof(array1)); - buffers.emplace_back(array2, sizeof(array2)); + std::vector buffers; + buffers.emplace_back(std::make_shared(array1, sizeof(array1)), + std::make_shared(array1, sizeof(array1) / 2)); + buffers.emplace_back(std::make_shared(array2, sizeof(array2)), + std::make_shared(array2, sizeof(array2) / 2)); std::vector ids(buffers.size()); for (size_t i = 0; i < ids.size(); i++) { @@ -317,13 +322,19 @@ TEST_F(SingleNodeTest, TestObjectInterface) { } // Test Get(). - std::vector> results; + std::vector> results; RAY_CHECK_OK(core_worker.Objects().Get(ids, -1, &results)); ASSERT_EQ(results.size(), 2); for (size_t i = 0; i < ids.size(); i++) { - ASSERT_EQ(results[i]->Size(), buffers[i].Size()); - ASSERT_EQ(memcmp(results[i]->Data(), buffers[i].Data(), buffers[i].Size()), 0); + ASSERT_EQ(results[i]->GetData()->Size(), buffers[i].GetData()->Size()); + ASSERT_EQ(memcmp(results[i]->GetData()->Data(), buffers[i].GetData()->Data(), + buffers[i].GetData()->Size()), + 0); + ASSERT_EQ(results[i]->GetMetadata()->Size(), buffers[i].GetMetadata()->Size()); + ASSERT_EQ(memcmp(results[i]->GetMetadata()->Data(), buffers[i].GetMetadata()->Data(), + buffers[i].GetMetadata()->Size()), + 0); } // Test Wait(). @@ -373,17 +384,19 @@ TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) { std::vector ids(buffers.size()); for (size_t i = 0; i < ids.size(); i++) { - RAY_CHECK_OK(worker1.Objects().Put(buffers[i], &ids[i])); + RAY_CHECK_OK(worker1.Objects().Put( + RayObject(std::make_shared(buffers[i]), nullptr), &ids[i])); } // Test Get() from remote node. - std::vector> results; + std::vector> results; RAY_CHECK_OK(worker2.Objects().Get(ids, -1, &results)); ASSERT_EQ(results.size(), 2); for (size_t i = 0; i < ids.size(); i++) { - ASSERT_EQ(results[i]->Size(), buffers[i].Size()); - ASSERT_EQ(memcmp(results[i]->Data(), buffers[i].Data(), buffers[i].Size()), 0); + ASSERT_EQ(results[i]->GetData()->Size(), buffers[i].Size()); + ASSERT_EQ(memcmp(results[i]->GetData()->Data(), buffers[i].Data(), buffers[i].Size()), + 0); } // Test Wait() from remote node. diff --git a/src/ray/core_worker/mock_worker.cc b/src/ray/core_worker/mock_worker.cc index eececbe4f..e08cb4bcc 100644 --- a/src/ray/core_worker/mock_worker.cc +++ b/src/ray/core_worker/mock_worker.cc @@ -1,5 +1,6 @@ #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" +#include "ray/core_worker/store_provider/store_provider.h" #include "ray/core_worker/task_execution.h" namespace ray { @@ -21,7 +22,7 @@ class MockWorker { void Run() { auto executor_func = [this](const RayFunction &ray_function, - const std::vector> &args, + const std::vector> &args, const TaskInfo &task_info, int num_returns) { // Note that this doesn't include dummy object id. RAY_CHECK(num_returns >= 0); @@ -29,15 +30,17 @@ class MockWorker { // Merge all the content from input args. std::vector buffer; for (const auto &arg : args) { - buffer.insert(buffer.end(), arg->Data(), arg->Data() + arg->Size()); + auto &data = arg->GetData(); + buffer.insert(buffer.end(), data->Data(), data->Data() + data->Size()); } - LocalMemoryBuffer memory_buffer(buffer.data(), buffer.size()); + auto return_value = RayObject( + std::make_shared(buffer.data(), buffer.size()), nullptr); // Write the merged content to each of return ids. for (int i = 0; i < num_returns; i++) { ObjectID id = ObjectID::ForTaskReturn(task_info.task_id, i + 1); - RAY_CHECK_OK(worker_.Objects().Put(memory_buffer, id)); + RAY_CHECK_OK(worker_.Objects().Put(return_value, id)); } return Status::OK(); }; diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc index 81777117c..4e3be942b 100644 --- a/src/ray/core_worker/object_interface.cc +++ b/src/ray/core_worker/object_interface.cc @@ -15,21 +15,22 @@ CoreWorkerObjectInterface::CoreWorkerObjectInterface(CoreWorker &core_worker) core_worker_.raylet_client_))); } -Status CoreWorkerObjectInterface::Put(const Buffer &buffer, ObjectID *object_id) { +Status CoreWorkerObjectInterface::Put(const RayObject &object, ObjectID *object_id) { ObjectID put_id = ObjectID::ForPut(core_worker_.worker_context_.GetCurrentTaskID(), core_worker_.worker_context_.GetNextPutIndex()); *object_id = put_id; - return Put(buffer, put_id); + return Put(object, put_id); } -Status CoreWorkerObjectInterface::Put(const Buffer &buffer, const ObjectID &object_id) { +Status CoreWorkerObjectInterface::Put(const RayObject &object, + const ObjectID &object_id) { auto type = static_cast(StoreProviderType::PLASMA); - return store_providers_[type]->Put(buffer, object_id); + return store_providers_[type]->Put(object, object_id); } Status CoreWorkerObjectInterface::Get(const std::vector &ids, int64_t timeout_ms, - std::vector> *results) { + std::vector> *results) { auto type = static_cast(StoreProviderType::PLASMA); return store_providers_[type]->Get( ids, timeout_ms, core_worker_.worker_context_.GetCurrentTaskID(), results); diff --git a/src/ray/core_worker/object_interface.h b/src/ray/core_worker/object_interface.h index 35403675f..218b155e8 100644 --- a/src/ray/core_worker/object_interface.h +++ b/src/ray/core_worker/object_interface.h @@ -20,17 +20,17 @@ class CoreWorkerObjectInterface { /// Put an object into object store. /// - /// \param[in] buffer Data buffer of the object. + /// \param[in] object The ray object. /// \param[out] object_id Generated ID of the object. /// \return Status. - Status Put(const Buffer &buffer, ObjectID *object_id); + Status Put(const RayObject &object, ObjectID *object_id); /// Put an object with specified ID into object store. /// - /// \param[in] buffer Data buffer of the object. + /// \param[in] object The ray object. /// \param[in] object_id Object ID specified by user. /// \return Status. - Status Put(const Buffer &buffer, const ObjectID &object_id); + Status Put(const RayObject &object, const ObjectID &object_id); /// Get a list of objects from the object store. /// @@ -39,7 +39,7 @@ class CoreWorkerObjectInterface { /// \param[out] results Result list of objects data. /// \return Status. Status Get(const std::vector &ids, int64_t timeout_ms, - std::vector> *results); + std::vector> *results); /// Wait for a list of objects to appear in the object store. /// diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index b5dd91d82..17ff2da91 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -13,17 +13,20 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( store_client_mutex_(store_client_mutex), raylet_client_(raylet_client) {} -Status CoreWorkerPlasmaStoreProvider::Put(const Buffer &buffer, +Status CoreWorkerPlasmaStoreProvider::Put(const RayObject &object, const ObjectID &object_id) { auto plasma_id = object_id.ToPlasmaId(); - std::shared_ptr data; + auto data = object.GetData(); + auto metadata = object.GetMetadata(); + std::shared_ptr out_buffer; { std::unique_lock guard(store_client_mutex_); - RAY_ARROW_RETURN_NOT_OK( - store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data)); + RAY_ARROW_RETURN_NOT_OK(store_client_.Create( + plasma_id, data->Size(), metadata ? metadata->Data() : nullptr, + metadata ? metadata->Size() : 0, &out_buffer)); } - memcpy(data->mutable_data(), buffer.Data(), buffer.Size()); + memcpy(out_buffer->mutable_data(), data->Data(), data->Size()); { std::unique_lock guard(store_client_mutex_); @@ -33,9 +36,9 @@ Status CoreWorkerPlasmaStoreProvider::Put(const Buffer &buffer, return Status::OK(); } -Status CoreWorkerPlasmaStoreProvider::Get(const std::vector &ids, - int64_t timeout_ms, const TaskID &task_id, - std::vector> *results) { +Status CoreWorkerPlasmaStoreProvider::Get( + const std::vector &ids, int64_t timeout_ms, const TaskID &task_id, + std::vector> *results) { (*results).resize(ids.size(), nullptr); bool was_blocked = false; @@ -90,8 +93,9 @@ Status CoreWorkerPlasmaStoreProvider::Get(const std::vector &ids, for (size_t i = 0; i < object_buffers.size(); i++) { if (object_buffers[i].data != nullptr) { const auto &object_id = unready_ids[i]; - (*results)[unready[object_id]] = - std::make_shared(object_buffers[i].data); + (*results)[unready[object_id]] = std::make_shared( + std::make_shared(object_buffers[i].data), + std::make_shared(object_buffers[i].metadata)); unready.erase(object_id); } } diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 0dfce1eb1..a82b96350 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -23,10 +23,10 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { /// Put an object with specified ID into object store. /// - /// \param[in] buffer Data buffer of the object. + /// \param[in] object The ray object. /// \param[in] object_id Object ID specified by user. /// \return Status. - Status Put(const Buffer &buffer, const ObjectID &object_id) override; + Status Put(const RayObject &object, const ObjectID &object_id) override; /// Get a list of objects from the object store. /// @@ -36,7 +36,7 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { /// \param[out] results Result list of objects data. /// \return Status. Status Get(const std::vector &ids, int64_t timeout_ms, const TaskID &task_id, - std::vector> *results) override; + std::vector> *results) override; /// Wait for a list of objects to appear in the object store. /// diff --git a/src/ray/core_worker/store_provider/store_provider.h b/src/ray/core_worker/store_provider/store_provider.h index f1521edf1..25b932437 100644 --- a/src/ray/core_worker/store_provider/store_provider.h +++ b/src/ray/core_worker/store_provider/store_provider.h @@ -8,6 +8,29 @@ namespace ray { +/// Binary representation of ray object. +class RayObject { + public: + /// Create a ray object instance. + /// + /// \param[in] data Data of the ray object. + /// \param[in] metadata Metadata of the ray object. + RayObject(const std::shared_ptr &data, const std::shared_ptr &metadata) + : data_(data), metadata_(metadata) {} + + /// Return the data of the ray object. + const std::shared_ptr &GetData() const { return data_; }; + + /// Return the metadata of the ray object. + const std::shared_ptr &GetMetadata() const { return metadata_; }; + + private: + /// Data of the ray object. + const std::shared_ptr data_; + /// Metadata of the ray object. + const std::shared_ptr metadata_; +}; + /// Provider interface for store access. Store provider should inherit from this class and /// provide implementions for the methods. The actual store provider may use a plasma /// store or local memory store in worker process, or possibly other types of storage. @@ -20,10 +43,10 @@ class CoreWorkerStoreProvider { /// Put an object with specified ID into object store. /// - /// \param[in] buffer Data buffer of the object. + /// \param[in] object The ray object. /// \param[in] object_id Object ID specified by user. /// \return Status. - virtual Status Put(const Buffer &buffer, const ObjectID &object_id) = 0; + virtual Status Put(const RayObject &object, const ObjectID &object_id) = 0; /// Get a list of objects from the object store. /// @@ -34,7 +57,7 @@ class CoreWorkerStoreProvider { /// \return Status. virtual Status Get(const std::vector &ids, int64_t timeout_ms, const TaskID &task_id, - std::vector> *results) = 0; + std::vector> *results) = 0; /// Wait for a list of objects to appear in the object store. /// diff --git a/src/ray/core_worker/task_execution.cc b/src/ray/core_worker/task_execution.cc index fd402fa33..435be36ff 100644 --- a/src/ray/core_worker/task_execution.cc +++ b/src/ray/core_worker/task_execution.cc @@ -34,7 +34,7 @@ Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { : WorkerLanguage::PYTHON; RayFunction func{language, spec.FunctionDescriptor()}; - std::vector> args; + std::vector> args; RAY_CHECK_OK(BuildArgsForExecutor(spec, &args)); TaskType task_type; @@ -67,7 +67,8 @@ Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) { } Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor( - const raylet::TaskSpecification &spec, std::vector> *args) { + const raylet::TaskSpecification &spec, + std::vector> *args) { auto num_args = spec.NumArgs(); (*args).resize(num_args); @@ -83,12 +84,14 @@ Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor( indices.push_back(i); } else { // pass by value. - (*args)[i] = std::make_shared( - const_cast(spec.ArgVal(i)), spec.ArgValLength(i)); + (*args)[i] = std::make_shared( + std::make_shared(const_cast(spec.ArgVal(i)), + spec.ArgValLength(i)), + nullptr); } } - std::vector> results; + std::vector> results; auto status = core_worker_.object_interface_.Get(object_ids_to_fetch, -1, &results); if (status.ok()) { for (size_t i = 0; i < results.size(); i++) { diff --git a/src/ray/core_worker/task_execution.h b/src/ray/core_worker/task_execution.h index f3e78f2f9..a59784691 100644 --- a/src/ray/core_worker/task_execution.h +++ b/src/ray/core_worker/task_execution.h @@ -4,6 +4,7 @@ #include "ray/common/buffer.h" #include "ray/common/status.h" #include "ray/core_worker/common.h" +#include "ray/core_worker/store_provider/store_provider.h" #include "ray/core_worker/transport/transport.h" namespace ray { @@ -24,9 +25,10 @@ class CoreWorkerTaskExecutionInterface { /// \param ray_function[in] Information about the function to execute. /// \param args[in] Arguments of the task. /// \return Status. - using TaskExecutor = std::function> &args, - const TaskInfo &task_info, int num_returns)>; + using TaskExecutor = + std::function> &args, + const TaskInfo &task_info, int num_returns)>; /// Start receving and executes tasks in a infinite loop. /// \return Status. @@ -42,7 +44,7 @@ class CoreWorkerTaskExecutionInterface { /// \param args[out] The arguments for passing to task executor. /// Status BuildArgsForExecutor(const raylet::TaskSpecification &spec, - std::vector> *args); + std::vector> *args); /// Reference to the parent CoreWorker instance. CoreWorker &core_worker_;