diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc index 36ec9b6ee..81a42a7ee 100644 --- a/src/ray/core_worker/object_interface.cc +++ b/src/ray/core_worker/object_interface.cc @@ -3,18 +3,49 @@ #include "ray/common/ray_config.h" #include "ray/core_worker/object_interface.h" #include "ray/core_worker/store_provider/local_plasma_provider.h" +#include "ray/core_worker/store_provider/memory_store_provider.h" #include "ray/core_worker/store_provider/plasma_store_provider.h" namespace ray { +// Group object ids according the the corresponding store providers. +void GroupObjectIdsByStoreProvider( + const std::vector &object_ids, + EnumUnorderedMap> *results) { + // There are two cases: + // - for task return objects from direct actor call, use memory store provider; + // - all the others use plasma store provider. + for (const auto &object_id : object_ids) { + auto type = StoreProviderType::PLASMA; + // For raylet transport we always use plasma store provider, for direct actor call + // there are a few cases: + // - objects manually added to store by `ray.put`: for these objects they always use + // plasma store provider; + // - task arguments: these objects are passed by value, and are not put into store; + // - task return objects: these are put into memory store of the task submitter + // and are only used locally. + // Thus we need to check whether this object is a task return object in additional + // to whether it's from direct actor call before we can choose memory store provider. + if (object_id.IsReturnObject() && + object_id.GetTransportType() == + static_cast(TaskTransportType::DIRECT_ACTOR)) { + type = StoreProviderType::MEMORY; + } + + (*results)[type].insert(object_id); + } +} + CoreWorkerObjectInterface::CoreWorkerObjectInterface( WorkerContext &worker_context, std::unique_ptr &raylet_client, const std::string &store_socket) : worker_context_(worker_context), raylet_client_(raylet_client), - store_socket_(store_socket) { + store_socket_(store_socket), + memory_store_(std::make_shared()) { AddStoreProvider(StoreProviderType::LOCAL_PLASMA); AddStoreProvider(StoreProviderType::PLASMA); + AddStoreProvider(StoreProviderType::MEMORY); } Status CoreWorkerObjectInterface::Put(const RayObject &object, ObjectID *object_id) { @@ -35,33 +66,49 @@ Status CoreWorkerObjectInterface::Get(const std::vector &ids, std::vector> *results) { (*results).resize(ids.size(), nullptr); - // Divide the object ids into two groups: direct call return objects and the rest, - // and de-duplicate for each group. - std::unordered_set direct_call_return_ids; - std::unordered_set other_ids; - for (const auto &object_id : ids) { - if (object_id.IsReturnObject() && - object_id.GetTransportType() == - static_cast(TaskTransportType::DIRECT_ACTOR)) { - direct_call_return_ids.insert(object_id); + // Divide the object ids by store provider type. For each store provider, + // maintain an unordered_set which does proper de-duplication, thus the + // store provider could simply assume its object ids don't have duplicates. + EnumUnorderedMap> + object_ids_per_store_provider; + GroupObjectIdsByStoreProvider(ids, &object_ids_per_store_provider); + + std::unordered_map> objects; + auto remaining_timeout_ms = timeout_ms; + + // Re-order the list so that we always get from plasma store provider first, + // since it uses a loop of `FetchOrReconstruct` and plasma `Get`, it's not + // desirable if other store providers use up the timeout and leaves no time + // for plasma provider to reconstruct the objects as necessary. + std::list< + std::pair>>> + ids_per_provider; + for (auto &entry : object_ids_per_store_provider) { + auto list_entry = std::make_pair(entry.first, std::ref(entry.second)); + if (entry.first == StoreProviderType::PLASMA) { + ids_per_provider.emplace_front(list_entry); } else { - other_ids.insert(object_id); + ids_per_provider.emplace_back(list_entry); } } - std::unordered_map> objects; - auto start_time = current_time_ms(); - // Fetch non-direct-call objects using `PLASMA` store provider. - RAY_RETURN_NOT_OK(Get(StoreProviderType::PLASMA, other_ids, timeout_ms, &objects)); - int64_t duration = current_time_ms() - start_time; - int64_t left_timeout_ms = - (timeout_ms == -1) ? timeout_ms - : std::max(static_cast(0), timeout_ms - duration); - - // Fetch direct call return objects using `LOCAL_PLASMA` store provider. - RAY_RETURN_NOT_OK(Get(StoreProviderType::LOCAL_PLASMA, direct_call_return_ids, - left_timeout_ms, &objects)); + // Note that if one store provider uses up the timeout, we will still try the others + // with a timeout of 0. + for (const auto &entry : object_ids_per_store_provider) { + auto start_time = current_time_ms(); + RAY_RETURN_NOT_OK( + GetFromStoreProvider(entry.first, entry.second, remaining_timeout_ms, &objects)); + if (remaining_timeout_ms > 0) { + int64_t duration = current_time_ms() - start_time; + remaining_timeout_ms = + std::max(static_cast(0), remaining_timeout_ms - duration); + } + } + // Loop through `ids` and fill each entry for the `results` vector, + // this ensures that entries `results` have exactly the same order as + // they are in `ids`. When there are duplicate object ids, all the entries + // for the same id are filled in. for (size_t i = 0; i < ids.size(); i++) { (*results)[i] = objects[ids[i]]; } @@ -69,7 +116,7 @@ Status CoreWorkerObjectInterface::Get(const std::vector &ids, return Status::OK(); } -Status CoreWorkerObjectInterface::Get( +Status CoreWorkerObjectInterface::GetFromStoreProvider( StoreProviderType type, const std::unordered_set &object_ids, int64_t timeout_ms, std::unordered_map> *results) { @@ -87,17 +134,116 @@ Status CoreWorkerObjectInterface::Get( return Status::OK(); } -Status CoreWorkerObjectInterface::Wait(const std::vector &object_ids, - int num_objects, int64_t timeout_ms, - std::vector *results) { - return store_providers_[StoreProviderType::PLASMA]->Wait( - object_ids, num_objects, timeout_ms, worker_context_.GetCurrentTaskID(), results); +Status CoreWorkerObjectInterface::Wait(const std::vector &ids, int num_objects, + int64_t timeout_ms, std::vector *results) { + (*results).resize(ids.size(), false); + + if (num_objects <= 0 || num_objects > static_cast(ids.size())) { + return Status::Invalid("num_objects value is not valid"); + } + + EnumUnorderedMap> + object_ids_per_store_provider; + GroupObjectIdsByStoreProvider(ids, &object_ids_per_store_provider); + + // Wait from all the store providers with timeout set to 0. This is to avoid the case + // where we might use up the entire timeout on trying to get objects from one store + // provider before even trying another (which might have all of the objects available). + RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(ids, object_ids_per_store_provider, + /* timeout_ms= */ 0, &num_objects, + results)); + + if (num_objects > 0) { + // Wait from all the store providers with the specified timeout + // if the required number of objects haven't been ready yet. + RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(ids, object_ids_per_store_provider, + /* timeout_ms= */ timeout_ms, + &num_objects, results)); + } + + return Status::OK(); +} + +Status CoreWorkerObjectInterface::WaitFromMultipleStoreProviders( + const std::vector &ids, + const EnumUnorderedMap> + &ids_per_provider, + int64_t timeout_ms, int *num_objects, std::vector *results) { + std::unordered_map object_counts; + for (const auto &entry : ids) { + auto iter = object_counts.find(entry); + if (iter == object_counts.end()) { + object_counts.emplace(entry, 1); + } else { + iter->second++; + } + } + + auto remaining_timeout_ms = timeout_ms; + for (const auto &entry : ids_per_provider) { + std::unordered_set objects; + auto start_time = current_time_ms(); + int required_objects = std::min(static_cast(entry.second.size()), *num_objects); + RAY_RETURN_NOT_OK(WaitFromStoreProvider(entry.first, entry.second, required_objects, + remaining_timeout_ms, &objects)); + if (remaining_timeout_ms > 0) { + int64_t duration = current_time_ms() - start_time; + remaining_timeout_ms = + std::max(static_cast(0), remaining_timeout_ms - duration); + } + for (const auto &entry : objects) { + *num_objects -= object_counts[entry]; + } + + for (size_t i = 0; i < ids.size(); i++) { + if (objects.count(ids[i]) > 0) { + (*results)[i] = true; + } + } + + if (*num_objects <= 0) { + break; + } + } + + return Status::OK(); +}; + +Status CoreWorkerObjectInterface::WaitFromStoreProvider( + StoreProviderType type, const std::unordered_set &object_ids, + int num_objects, int64_t timeout_ms, std::unordered_set *results) { + std::vector ids(object_ids.begin(), object_ids.end()); + if (!ids.empty()) { + std::vector objects; + RAY_RETURN_NOT_OK(store_providers_[type]->Wait( + ids, num_objects, timeout_ms, worker_context_.GetCurrentTaskID(), &objects)); + RAY_CHECK(ids.size() == objects.size()); + for (size_t i = 0; i < objects.size(); i++) { + if (objects[i]) { + (*results).insert(ids[i]); + } + } + } + + return Status::OK(); } Status CoreWorkerObjectInterface::Delete(const std::vector &object_ids, bool local_only, bool delete_creating_tasks) { - return store_providers_[StoreProviderType::PLASMA]->Delete(object_ids, local_only, - delete_creating_tasks); + EnumUnorderedMap> + object_ids_per_store_provider; + GroupObjectIdsByStoreProvider(object_ids, &object_ids_per_store_provider); + + for (const auto &entry : object_ids_per_store_provider) { + auto type = entry.first; + bool is_plasma = (type == StoreProviderType::PLASMA); + + std::vector ids(entry.second.begin(), entry.second.end()); + RAY_RETURN_NOT_OK(store_providers_[type]->Delete( + ids, is_plasma ? local_only : false, is_plasma ? delete_creating_tasks : false)); + } + + return Status::OK(); } void CoreWorkerObjectInterface::AddStoreProvider(StoreProviderType type) { @@ -113,6 +259,10 @@ std::unique_ptr CoreWorkerObjectInterface::CreateStoreP case StoreProviderType::PLASMA: return std::unique_ptr( new CoreWorkerPlasmaStoreProvider(store_socket_, raylet_client_)); + case StoreProviderType::MEMORY: + return std::unique_ptr( + new CoreWorkerMemoryStoreProvider(memory_store_)); + break; default: RAY_LOG(FATAL) << "unknown store provider type " << static_cast(type); return nullptr; diff --git a/src/ray/core_worker/object_interface.h b/src/ray/core_worker/object_interface.h index bf8f71ecf..dab30a3b2 100644 --- a/src/ray/core_worker/object_interface.h +++ b/src/ray/core_worker/object_interface.h @@ -13,6 +13,7 @@ namespace ray { class CoreWorker; class CoreWorkerStoreProvider; +class CoreWorkerMemoryStore; /// The interface that contains all `CoreWorker` methods that are related to object store. class CoreWorkerObjectInterface { @@ -35,7 +36,7 @@ class CoreWorkerObjectInterface { /// \return Status. Status Put(const RayObject &object, const ObjectID &object_id); - /// Get a list of objects from the object store. + /// Get a list of objects from the object store. Duplicate object ids are supported. /// /// \param[in] ids IDs of the objects to get. /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. @@ -45,9 +46,13 @@ class CoreWorkerObjectInterface { std::vector> *results); /// Wait for a list of objects to appear in the object store. + /// Duplicate object ids are supported, and `num_objects` includes duplicate ids in this + /// case. + /// TODO(zhijunfu): it is probably more clear in semantics to just fail when there + /// are duplicates, and require it to be handled at application level. /// /// \param[in] IDs of the objects to wait for. - /// \param[in] num_returns Number of objects that should appear. + /// \param[in] num_objects Number of objects that should appear. /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. /// \param[out] results A bitset that indicates each object has appeared or not. /// \return Status. @@ -66,6 +71,21 @@ class CoreWorkerObjectInterface { bool delete_creating_tasks); private: + /// Helper function to get a list of objects from different store providers. + /// + /// \param[in] object_ids IDs of the objects to get. + /// \param[in] ids_per_provider A map from store provider type to the set of + // object ids for that store provider. + /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's -1. + /// \param[in/out] num_objects Number of objects that should appear before returning. + /// \param[out] results A bitset that indicates each object has appeared or not. + /// \return Status. + Status WaitFromMultipleStoreProviders( + const std::vector &object_ids, + const EnumUnorderedMap> + &ids_per_provider, + int64_t timeout_ms, int *num_objects, std::vector *results); + /// Helper function to get a list of objects from a specific store provider. /// /// \param[in] type The type of store provider to use. @@ -73,9 +93,23 @@ class CoreWorkerObjectInterface { /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's -1. /// \param[out] results Result list of objects data. /// \return Status. - Status Get(StoreProviderType type, const std::unordered_set &object_ids, - int64_t timeout_ms, - std::unordered_map> *results); + Status GetFromStoreProvider( + StoreProviderType type, const std::unordered_set &object_ids, + int64_t timeout_ms, + std::unordered_map> *results); + + /// Helper function to wait a list of objects from a specific store provider. + /// + /// \param[in] type The type of store provider to use. + /// \param[in] object_ids IDs of the objects to wait for. + /// \param[in] num_objects Number of objects that should appear before returning. + /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. + /// \param[out] results A bitset that indicates each object has appeared or not. + /// \return Status. + Status WaitFromStoreProvider(StoreProviderType type, + const std::unordered_set &object_ids, + int num_objects, int64_t timeout_ms, + std::unordered_set *results); /// Create a new store provider for the specified type on demand. std::unique_ptr CreateStoreProvider( @@ -92,6 +126,9 @@ class CoreWorkerObjectInterface { /// Store socket name. std::string store_socket_; + /// In-memory store for return objects. This is used for `MEMORY` store provider. + std::shared_ptr memory_store_; + /// All the store providers supported. EnumUnorderedMap> store_providers_; diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 752ee7a8d..675771a8d 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -10,7 +10,8 @@ namespace ray { /// A class that represents a `Get` request. class GetRequest { public: - GetRequest(std::unordered_set object_ids, bool remove_after_get); + GetRequest(std::unordered_set object_ids, size_t num_objects, + bool remove_after_get); const std::unordered_set &ObjectIds() const; @@ -31,9 +32,11 @@ class GetRequest { void Wait(); /// The object IDs involved in this request. - std::unordered_set object_ids_; + const std::unordered_set object_ids_; /// The object information for the objects in this request. std::unordered_map> objects_; + /// Number of objects required. + const size_t num_objects_; // Whether the requested objects should be removed from store // after `get` returns. @@ -44,8 +47,14 @@ class GetRequest { std::condition_variable cv_; }; -GetRequest::GetRequest(std::unordered_set object_ids, bool remove_after_get) - : object_ids_(std::move(object_ids)), remove_after_get_(remove_after_get) {} +GetRequest::GetRequest(std::unordered_set object_ids, size_t num_objects, + bool remove_after_get) + : object_ids_(std::move(object_ids)), + num_objects_(num_objects), + remove_after_get_(remove_after_get), + is_ready_(false) { + RAY_CHECK(num_objects_ <= object_ids_.size()); +} const std::unordered_set &GetRequest::ObjectIds() const { return object_ids_; } @@ -80,7 +89,7 @@ void GetRequest::Wait() { void GetRequest::Set(const ObjectID &object_id, std::shared_ptr object) { std::unique_lock lock(mutex_); objects_.emplace(object_id, object); - if (objects_.size() == object_ids_.size()) { + if (objects_.size() == num_objects_) { is_ready_ = true; cv_.notify_all(); } @@ -128,7 +137,8 @@ Status CoreWorkerMemoryStore::Put(const ObjectID &object_id, const RayObject &ob } Status CoreWorkerMemoryStore::Get(const std::vector &object_ids, - int64_t timeout_ms, bool remove_after_get, + int num_objects, int64_t timeout_ms, + bool remove_after_get, std::vector> *results) { (*results).resize(object_ids.size(), nullptr); @@ -164,9 +174,16 @@ Status CoreWorkerMemoryStore::Get(const std::vector &object_ids, return Status::OK(); } + if (object_ids.size() - remaining_ids.size() >= static_cast(num_objects)) { + // Already get enough objects. + return Status::OK(); + } + + size_t required_objects = num_objects - (object_ids.size() - remaining_ids.size()); + // Otherwise, create a GetRequest to track remaining objects. - get_request = - std::make_shared(std::move(remaining_ids), remove_after_get); + get_request = std::make_shared(std::move(remaining_ids), required_objects, + remove_after_get); for (const auto &object_id : get_request->ObjectIds()) { object_get_requests_[object_id].push_back(get_request); } diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 2d53fec94..0a8422f81 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -28,13 +28,14 @@ class CoreWorkerMemoryStore { /// Get a list of objects from the object store. /// - /// \param[in] object_ids IDs of the objects to get. Duplicates are allowed. + /// \param[in] object_ids IDs of the objects to get. Duplicates are not allowed. + /// \param[in] num_objects Number of objects that should appear. /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. /// \param[in] remove_after_get When to remove the objects from store after `Get` /// finishes. /// \param[out] results Result list of objects data. /// \return Status. - Status Get(const std::vector &object_ids, int64_t timeout_ms, + Status Get(const std::vector &object_ids, int num_objects, int64_t timeout_ms, bool remove_after_get, std::vector> *results); /// Delete a list of objects from the object store. diff --git a/src/ray/core_worker/store_provider/memory_store_provider.cc b/src/ray/core_worker/store_provider/memory_store_provider.cc index ef5c39e2c..e83a5a307 100644 --- a/src/ray/core_worker/store_provider/memory_store_provider.cc +++ b/src/ray/core_worker/store_provider/memory_store_provider.cc @@ -24,21 +24,17 @@ Status CoreWorkerMemoryStoreProvider::Put(const RayObject &object, Status CoreWorkerMemoryStoreProvider::Get( const std::vector &object_ids, int64_t timeout_ms, const TaskID &task_id, std::vector> *results) { - return store_->Get(object_ids, timeout_ms, true, results); + return store_->Get(object_ids, object_ids.size(), timeout_ms, true, results); } Status CoreWorkerMemoryStoreProvider::Wait(const std::vector &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, std::vector *results) { - if (num_objects != static_cast(object_ids.size())) { - return Status::Invalid("num_objects should equal to number of items in object_ids"); - } - (*results).resize(object_ids.size(), false); std::vector> result_objects; - auto status = store_->Get(object_ids, timeout_ms, false, &result_objects); + auto status = store_->Get(object_ids, num_objects, timeout_ms, false, &result_objects); if (status.ok()) { RAY_CHECK(result_objects.size() == object_ids.size()); for (size_t i = 0; i < object_ids.size(); i++) { diff --git a/src/ray/core_worker/store_provider/store_provider.h b/src/ray/core_worker/store_provider/store_provider.h index b028a1e3b..63dbbca64 100644 --- a/src/ray/core_worker/store_provider/store_provider.h +++ b/src/ray/core_worker/store_provider/store_provider.h @@ -89,7 +89,7 @@ class CoreWorkerStoreProvider { /// Wait for a list of objects to appear in the object store. /// /// \param[in] IDs of the objects to wait for. - /// \param[in] num_returns Number of objects that should appear. + /// \param[in] num_objects Number of objects that should appear before returning. /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. /// \param[in] task_id ID for the current task. /// \param[out] results A bitset that indicates each object has appeared or not. diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index e8e347fd1..de880f83e 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -106,10 +106,12 @@ CoreWorkerTaskInterface::CoreWorkerTaskInterface( task_submitters_.emplace(TaskTransportType::RAYLET, std::unique_ptr( new CoreWorkerRayletTaskSubmitter(raylet_client))); - task_submitters_.emplace(TaskTransportType::DIRECT_ACTOR, - std::unique_ptr( - new CoreWorkerDirectActorTaskSubmitter( - io_service, gcs_client, object_interface))); + task_submitters_.emplace( + TaskTransportType::DIRECT_ACTOR, + std::unique_ptr( + new CoreWorkerDirectActorTaskSubmitter( + io_service, gcs_client, + object_interface.CreateStoreProvider(StoreProviderType::MEMORY)))); } void CoreWorkerTaskInterface::BuildCommonTaskSpec( diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 6ccd62a3e..99a7f5ca8 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -499,7 +499,7 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) { RAY_CHECK_OK(provider.Put(buffers[i], ids[i])); } - // Test Wait(). + // Test Wait() with duplicate object ids. std::vector ids_with_duplicate; ids_with_duplicate.insert(ids_with_duplicate.end(), ids.begin(), ids.end()); // add the same ids again to test `Get` with duplicate object ids. @@ -514,6 +514,13 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) { ASSERT_EQ(wait_results.size(), 5); ASSERT_EQ(wait_results, std::vector({true, true, true, true, false})); + // Test Wait() with duplicate object ids, and the required `num_objects` + // is less than size of `wait_ids`. + wait_results.clear(); + RAY_CHECK_OK(provider.Wait(wait_ids, 4, -1, RandomTaskId(), &wait_results)); + ASSERT_EQ(wait_results.size(), 5); + ASSERT_EQ(wait_results, std::vector({true, true, true, true, false})); + // Test Get(). std::vector> results; RAY_CHECK_OK(provider.Get(ids_with_duplicate, -1, RandomTaskId(), &results)); @@ -542,6 +549,29 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) { ASSERT_EQ(results.size(), 2); ASSERT_TRUE(!results[0]); ASSERT_TRUE(!results[1]); + + // Test Wait() with objects which will become ready later. + std::vector unready_ids(buffers.size()); + for (size_t i = 0; i < unready_ids.size(); i++) { + unready_ids[i] = ObjectID::FromRandom(); + } + + auto thread_func = [&unready_ids, &provider, &buffers]() { + sleep(1); + + for (size_t i = 0; i < unready_ids.size(); i++) { + RAY_CHECK_OK(provider.Put(buffers[i], unready_ids[i])); + } + }; + + std::thread async_thread(thread_func); + + // wait for the objects to appear. + wait_results.clear(); + RAY_CHECK_OK( + provider.Wait(unready_ids, unready_ids.size(), -1, RandomTaskId(), &wait_results)); + // wait for the thread to finish. + async_thread.join(); } class ZeroNodeTest : public CoreWorkerTest { diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 0c59fa0b7..087560df4 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -17,12 +17,11 @@ bool HasByReferenceArgs(const TaskSpecification &spec) { CoreWorkerDirectActorTaskSubmitter::CoreWorkerDirectActorTaskSubmitter( boost::asio::io_service &io_service, gcs::RedisGcsClient &gcs_client, - CoreWorkerObjectInterface &object_interface) + std::unique_ptr store_provider) : io_service_(io_service), gcs_client_(gcs_client), client_call_manager_(io_service), - store_provider_( - object_interface.CreateStoreProvider(StoreProviderType::LOCAL_PLASMA)) { + store_provider_(std::move(store_provider)) { RAY_CHECK_OK(SubscribeActorUpdates()); } diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index f9528a852..056615ff0 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -28,9 +28,9 @@ struct ActorStateData { class CoreWorkerDirectActorTaskSubmitter : public CoreWorkerTaskSubmitter { public: - CoreWorkerDirectActorTaskSubmitter(boost::asio::io_service &io_service, - gcs::RedisGcsClient &gcs_client, - CoreWorkerObjectInterface &object_interface); + CoreWorkerDirectActorTaskSubmitter( + boost::asio::io_service &io_service, gcs::RedisGcsClient &gcs_client, + std::unique_ptr store_provider); /// Submit a task to an actor for execution. ///