From eeb15771bae2476e21200245454a95b857df18df Mon Sep 17 00:00:00 2001 From: Yuhong Guo Date: Wed, 15 Aug 2018 13:01:23 +0800 Subject: [PATCH] Add `ray.internal.free` (#2542) --- python/ray/__init__.py | 3 +- python/ray/internal/__init__.py | 7 ++ python/ray/internal/internal_api.py | 48 ++++++++++++ .../lib/python/local_scheduler_extension.cc | 39 ++++++++++ src/local_scheduler/local_scheduler_client.cc | 17 ++++ src/local_scheduler/local_scheduler_client.h | 12 +++ src/ray/gcs/tables.cc | 4 + src/ray/gcs/tables.h | 7 ++ .../object_manager/format/object_manager.fbs | 8 +- src/ray/object_manager/object_buffer_pool.cc | 9 +++ src/ray/object_manager/object_buffer_pool.h | 6 ++ src/ray/object_manager/object_directory.cc | 18 +++++ src/ray/object_manager/object_directory.h | 9 +++ src/ray/object_manager/object_manager.cc | 52 +++++++++++++ src/ray/object_manager/object_manager.h | 15 ++++ src/ray/raylet/format/node_manager.fbs | 10 +++ src/ray/raylet/node_manager.cc | 5 ++ src/ray/raylet/reconstruction_policy_test.cc | 1 + test/runtest.py | 78 +++++++++++++++++++ 19 files changed, 346 insertions(+), 2 deletions(-) create mode 100644 python/ray/internal/__init__.py create mode 100644 python/ray/internal/internal_api.py diff --git a/python/ray/__init__.py b/python/ray/__init__.py index d05aa6d9e..2130f9ff9 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -54,6 +54,7 @@ from ray.worker import (error_info, init, connect, disconnect, get, put, wait, from ray.worker import (SCRIPT_MODE, WORKER_MODE, LOCAL_MODE, SILENT_MODE, PYTHON_MODE) # noqa: E402 from ray.worker import global_state # noqa: E402 +import ray.internal # noqa: E402 # We import ray.actor because some code is run in actor.py which initializes # some functions in the worker. import ray.actor # noqa: F401 @@ -68,7 +69,7 @@ __all__ = [ "remote", "profile", "actor", "method", "get_gpu_ids", "get_resource_ids", "get_webui_url", "register_custom_serializer", "shutdown", "SCRIPT_MODE", "WORKER_MODE", "LOCAL_MODE", "SILENT_MODE", "PYTHON_MODE", "global_state", - "ObjectID", "_config", "__version__" + "ObjectID", "_config", "__version__", "internal" ] import ctypes # noqa: E402 diff --git a/python/ray/internal/__init__.py b/python/ray/internal/__init__.py new file mode 100644 index 000000000..e28ebcb2f --- /dev/null +++ b/python/ray/internal/__init__.py @@ -0,0 +1,7 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from ray.internal.internal_api import free + +__all__ = ["free"] diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py new file mode 100644 index 000000000..062d633ee --- /dev/null +++ b/python/ray/internal/internal_api.py @@ -0,0 +1,48 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import ray.local_scheduler +import ray.worker +from ray import profiling + +__all__ = ["free"] + + +def free(object_ids, local_only=False, worker=None): + """Free a list of IDs from object stores. + + This function is a low-level API which should be used in restricted + scenarios. + + If local_only is false, the request will be send to all object stores. + + This method will not return any value to indicate whether the deletion is + successful or not. This function is an instruction to object store. If + the some of the objects are in use, object stores will delete them later + when the ref count is down to 0. + + Args: + object_ids (List[ObjectID]): List of object IDs to delete. + local_only (bool): Whether only deleting the list of objects in local + object store or all object stores. + """ + if worker is None: + worker = ray.worker.get_global_worker() + + if isinstance(object_ids, ray.ObjectID): + object_ids = [object_ids] + + if not isinstance(object_ids, list): + raise TypeError("free() expects a list of ObjectID, got {}".format( + type(object_ids))) + + worker.check_connected() + with profiling.profile("ray.free", worker=worker): + if len(object_ids) == 0: + return + + if worker.use_raylet: + worker.local_scheduler_client.free(object_ids, local_only) + else: + raise Exception("Free is not supported in legacy backend.") diff --git a/src/local_scheduler/lib/python/local_scheduler_extension.cc b/src/local_scheduler/lib/python/local_scheduler_extension.cc index d5f9d1a7a..07853aaf8 100644 --- a/src/local_scheduler/lib/python/local_scheduler_extension.cc +++ b/src/local_scheduler/lib/python/local_scheduler_extension.cc @@ -414,6 +414,43 @@ static PyObject *PyLocalSchedulerClient_push_profile_events(PyObject *self, Py_RETURN_NONE; } +static PyObject *PyLocalSchedulerClient_free(PyObject *self, PyObject *args) { + PyObject *py_object_ids; + PyObject *py_local_only; + + if (!PyArg_ParseTuple(args, "OO", &py_object_ids, &py_local_only)) { + return NULL; + } + + bool local_only = static_cast(PyObject_IsTrue(py_local_only)); + + // Convert object ids. + PyObject *iter = PyObject_GetIter(py_object_ids); + if (!iter) { + return NULL; + } + std::vector object_ids; + while (true) { + PyObject *next = PyIter_Next(iter); + ObjectID object_id; + if (!next) { + break; + } + if (!PyObjectToUniqueID(next, &object_id)) { + // Error parsing object ID. + return NULL; + } + object_ids.push_back(object_id); + } + + // Invoke local_scheduler_free_objects_in_object_store. + local_scheduler_free_objects_in_object_store( + reinterpret_cast(self) + ->local_scheduler_connection, + object_ids, local_only); + Py_RETURN_NONE; +} + static PyMethodDef PyLocalSchedulerClient_methods[] = { {"disconnect", (PyCFunction) PyLocalSchedulerClient_disconnect, METH_NOARGS, "Notify the local scheduler that this client is exiting gracefully."}, @@ -446,6 +483,8 @@ static PyMethodDef PyLocalSchedulerClient_methods[] = { {"push_profile_events", (PyCFunction) PyLocalSchedulerClient_push_profile_events, METH_VARARGS, "Store some profiling events in the GCS."}, + {"free", (PyCFunction) PyLocalSchedulerClient_free, METH_VARARGS, + "Free a list of objects from object stores."}, {NULL} /* Sentinel */ }; diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index 775c3518e..994c0f176 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -351,3 +351,20 @@ void local_scheduler_push_profile_events( ray::protocol::MessageType::PushProfileEventsRequest), fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex); } + +void local_scheduler_free_objects_in_object_store( + LocalSchedulerConnection *conn, + const std::vector &object_ids, + bool local_only) { + flatbuffers::FlatBufferBuilder fbb; + auto message = ray::protocol::CreateFreeObjectsRequest( + fbb, local_only, to_flatbuf(fbb, object_ids)); + fbb.Finish(message); + + int success = write_message( + conn->conn, + static_cast( + ray::protocol::MessageType::FreeObjectsInObjectStoreRequest), + fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex); + RAY_CHECK(success == 0) << "Failed to write message to raylet."; +} diff --git a/src/local_scheduler/local_scheduler_client.h b/src/local_scheduler/local_scheduler_client.h index 5026313d2..22eb51484 100644 --- a/src/local_scheduler/local_scheduler_client.h +++ b/src/local_scheduler/local_scheduler_client.h @@ -244,4 +244,16 @@ void local_scheduler_push_profile_events( LocalSchedulerConnection *conn, const ProfileTableDataT &profile_events); +/// Free a list of objects from object stores. +/// +/// \param conn The connection information. +/// \param object_ids A list of ObjectsIDs to be deleted. +/// \param local_only Whether keep this request with local object store +/// or send it to all the object stores. +/// \return Void. +void local_scheduler_free_objects_in_object_store( + LocalSchedulerConnection *conn, + const std::vector &object_ids, + bool local_only); + #endif diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index dd09a2a3b..ce83f1464 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -426,6 +426,10 @@ const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) const } } +const std::unordered_map &ClientTable::GetAllClients() const { + return client_cache_; +} + template class Log; template class Log; template class Table; diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 528bcff4f..ba8b561ea 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -651,6 +651,13 @@ class ClientTable : private Log { /// \return Whether the client with ID client_id is removed. bool IsRemoved(const ClientID &client_id) const; + /// Get the information of all clients. + /// + /// Note: The return value contains ClientID::nil() which should be filtered. + /// + /// \return The client ID to client information map. + const std::unordered_map &GetAllClients() const; + private: /// Handle a client table notification. void HandleNotification(AsyncGcsClient *client, const ClientTableDataT ¬ifications); diff --git a/src/ray/object_manager/format/object_manager.fbs b/src/ray/object_manager/format/object_manager.fbs index d47905287..008ce9786 100644 --- a/src/ray/object_manager/format/object_manager.fbs +++ b/src/ray/object_manager/format/object_manager.fbs @@ -4,7 +4,8 @@ namespace ray.object_manager.protocol; enum MessageType:int { ConnectClient = 1, PushRequest, - PullRequest + PullRequest, + FreeRequest } table PushRequestMessage { @@ -31,3 +32,8 @@ table ConnectClientMessage { // Whether this is a transfer connection. is_transfer: bool; } + +table FreeRequestMessage { + // List of IDs to be deleted. + object_ids: [string]; +} diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 0a6f5d86b..fdb362359 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -185,4 +185,13 @@ std::vector ObjectBufferPool::BuildChunks( return chunks; } +void ObjectBufferPool::FreeObjects(const std::vector &object_ids) { + std::vector plasma_ids; + plasma_ids.reserve(object_ids.size()); + for (const auto &id : object_ids) { + plasma_ids.push_back(id.to_plasma_id()); + } + ARROW_CHECK_OK(store_client_.Delete(plasma_ids)); +} + } // namespace ray diff --git a/src/ray/object_manager/object_buffer_pool.h b/src/ray/object_manager/object_buffer_pool.h index 3f15b2023..9fe347451 100644 --- a/src/ray/object_manager/object_buffer_pool.h +++ b/src/ray/object_manager/object_buffer_pool.h @@ -123,6 +123,12 @@ class ObjectBufferPool { /// \param chunk_index The index of the chunk. void SealChunk(const ObjectID &object_id, uint64_t chunk_index); + /// Free a list of objects from object store. + /// + /// \param object_ids the The list of ObjectIDs to be deleted. + /// \return Void. + void FreeObjects(const std::vector &object_ids); + private: /// Abort the create operation associated with an object. This destroys the buffer /// state, including create operations in progress for all chunks of the object. diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 80a3f990d..38c7cdd45 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -117,6 +117,24 @@ ray::Status ObjectDirectory::GetInformation(const ClientID &client_id, return ray::Status::OK(); } +void ObjectDirectory::RunFunctionForEachClient( + const InfoSuccessCallback &client_function) { + const auto &clients = gcs_client_->client_table().GetAllClients(); + for (const auto &client_pair : clients) { + const ClientTableDataT &data = client_pair.second; + if (client_pair.first == ClientID::nil() || + client_pair.first == gcs_client_->client_table().GetLocalClientId() || + !data.is_insertion) { + continue; + } else { + const auto &info = + RemoteConnectionInfo(client_pair.first, data.node_manager_address, + static_cast(data.object_manager_port)); + client_function(info); + } + } +} + ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_id, const ObjectID &object_id, const OnLocationsFound &callback) { diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index 0de499135..e77e74a94 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -101,6 +101,13 @@ class ObjectDirectoryInterface { /// \return Status of whether this method succeeded. virtual ray::Status ReportObjectRemoved(const ObjectID &object_id, const ClientID &client_id) = 0; + + /// Go through all the client information. + /// + /// \param success_cb A callback which handles the success of this method. + /// This function will be called multiple times. + /// \return Void. + virtual void RunFunctionForEachClient(const InfoSuccessCallback &client_function) = 0; }; /// Ray ObjectDirectory declaration. @@ -115,6 +122,8 @@ class ObjectDirectory : public ObjectDirectoryInterface { const InfoSuccessCallback &success_callback, const InfoFailureCallback &fail_callback) override; + void RunFunctionForEachClient(const InfoSuccessCallback &client_function) override; + ray::Status LookupLocations(const ObjectID &object_id, const OnLocationsFound &callback) override; diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index a7c59bc1d..21b44ed45 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -1,4 +1,5 @@ #include "ray/object_manager/object_manager.h" +#include "common/common_protocol.h" #include "ray/util/util.h" namespace asio = boost::asio; @@ -655,6 +656,10 @@ void ObjectManager::ProcessClientMessage(std::shared_ptr &c ConnectClient(conn, message); break; } + case static_cast(object_manager_protocol::MessageType::FreeRequest): { + ReceiveFreeRequest(conn, message); + break; + } case static_cast(protocol::MessageType::DisconnectClient): { // TODO(hme): Disconnect without depending on the node manager protocol. DisconnectClient(conn, message); @@ -755,4 +760,51 @@ void ObjectManager::ExecuteReceiveObject(const ClientID &client_id, << "/" << config_.max_receives; } +void ObjectManager::ReceiveFreeRequest(std::shared_ptr &conn, + const uint8_t *message) { + auto free_request = + flatbuffers::GetRoot(message); + std::vector object_ids = from_flatbuf(*free_request->object_ids()); + // This RPC should come from another Object Manager. + // Keep this request local. + bool local_only = true; + FreeObjects(object_ids, local_only); + conn->ProcessMessages(); +} + +void ObjectManager::FreeObjects(const std::vector &object_ids, + bool local_only) { + buffer_pool_.FreeObjects(object_ids); + if (!local_only) { + SpreadFreeObjectRequest(object_ids); + } +} + +void ObjectManager::SpreadFreeObjectRequest(const std::vector &object_ids) { + // This code path should be called from node manager. + flatbuffers::FlatBufferBuilder fbb; + flatbuffers::Offset request = + object_manager_protocol::CreateFreeRequestMessage(fbb, to_flatbuf(fbb, object_ids)); + fbb.Finish(request); + auto function_on_client = [this, &fbb](const RemoteConnectionInfo &connection_info) { + std::shared_ptr conn; + connection_pool_.GetSender(ConnectionPool::ConnectionType::MESSAGE, + connection_info.client_id, &conn); + if (conn == nullptr) { + conn = CreateSenderConnection(ConnectionPool::ConnectionType::MESSAGE, + connection_info); + connection_pool_.RegisterSender(ConnectionPool::ConnectionType::MESSAGE, + connection_info.client_id, conn); + } + ray::Status status = conn->WriteMessage( + static_cast(object_manager_protocol::MessageType::FreeRequest), + fbb.GetSize(), fbb.GetBufferPointer()); + if (status.ok()) { + connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::MESSAGE, conn); + } + // TODO(Yuhong): Implement ConnectionPool::RemoveSender and call it in "else". + }; + object_directory_->RunFunctionForEachClient(function_on_client); +} + } // namespace ray diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 7ff2c4930..11b5d7a6c 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -163,6 +163,13 @@ class ObjectManager : public ObjectManagerInterface { uint64_t num_required_objects, bool wait_local, const WaitCallback &callback); + /// Free a list of objects from object store. + /// + /// \param object_ids the The list of ObjectIDs to be deleted. + /// \param local_only Whether keep this request with local object store + /// or send it to all the object stores. + void FreeObjects(const std::vector &object_ids, bool local_only); + private: friend class TestObjectManager; @@ -214,6 +221,11 @@ class ObjectManager : public ObjectManagerInterface { /// Completion handler for Wait. void WaitComplete(const UniqueID &wait_id); + /// Spread the Free request to all objects managers. + /// + /// \param object_ids the The list of ObjectIDs to be deleted. + void SpreadFreeObjectRequest(const std::vector &object_ids); + /// Handle starting, running, and stopping asio io_service. void StartIOService(); void RunSendService(); @@ -271,6 +283,9 @@ class ObjectManager : public ObjectManagerInterface { /// Handles receiving a pull request message. void ReceivePullRequest(std::shared_ptr &conn, const uint8_t *message); + /// Handles freeing objects request. + void ReceiveFreeRequest(std::shared_ptr &conn, + const uint8_t *message); /// Handles connect message of a new client connection. void ConnectClient(std::shared_ptr &conn, const uint8_t *message); diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index 0fb63290d..f339a08f4 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -68,6 +68,8 @@ enum MessageType:int { // Push some profiling events to the GCS. When sending this message to the // node manager, the message itself is serialized as a ProfileTableData object. PushProfileEventsRequest, + // Free the objects in objects store. + FreeObjectsInObjectStoreRequest, } table TaskExecutionSpecification { @@ -177,3 +179,11 @@ table PushErrorRequest { // The timestamp of the error message. timestamp: double; } + +table FreeObjectsRequest { + // Whether keep this request with local object store + // or send it to all the object stores. + local_only: bool; + // List of object ids we'll delete from object store. + object_ids: [string]; +} diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 0972a6553..5ddb5d74a 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -712,6 +712,11 @@ void NodeManager::ProcessClientMessage( RAY_CHECK_OK(gcs_client_->profile_table().AddProfileEventBatch(*message)); } break; + case protocol::MessageType::FreeObjectsInObjectStoreRequest: { + auto message = flatbuffers::GetRoot(message_data); + std::vector object_ids = from_flatbuf(*message->object_ids()); + object_manager_.FreeObjects(object_ids, message->local_only()); + } break; default: RAY_LOG(FATAL) << "Received unexpected message type " << message_type; diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index a0a4334ad..38861543d 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -47,6 +47,7 @@ class MockObjectDirectory : public ObjectDirectoryInterface { MOCK_METHOD3(ReportObjectAdded, ray::Status(const ObjectID &, const ClientID &, const ObjectInfoT &)); MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &)); + MOCK_METHOD1(RunFunctionForEachClient, void(const InfoSuccessCallback &success_cb)); private: std::vector> callbacks_; diff --git a/test/runtest.py b/test/runtest.py index e5654e922..b94cf4943 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1205,6 +1205,84 @@ class APITest(unittest.TestCase): # test multi-threading in the worker ray.get(test_multi_threading_in_worker.remote()) + @unittest.skipIf( + os.environ.get("RAY_USE_XRAY") != "1", + "This test only works with xray.") + def testFreeObjectsMultiNode(self): + ray.worker._init( + start_ray_local=True, + num_local_schedulers=3, + num_workers=1, + num_cpus=[1, 1, 1], + resources=[{ + "Custom0": 1 + }, { + "Custom1": 1 + }, { + "Custom2": 1 + }], + use_raylet=True) + + @ray.remote(resources={"Custom0": 1}) + def run_on_0(): + return ray.worker.global_worker.plasma_client.store_socket_name + + @ray.remote(resources={"Custom1": 1}) + def run_on_1(): + return ray.worker.global_worker.plasma_client.store_socket_name + + @ray.remote(resources={"Custom2": 1}) + def run_on_2(): + return ray.worker.global_worker.plasma_client.store_socket_name + + def create(): + a = run_on_0.remote() + b = run_on_1.remote() + c = run_on_2.remote() + (l1, l2) = ray.wait([a, b, c], num_returns=3) + assert len(l1) == 3 + assert len(l2) == 0 + return (a, b, c) + + def flush(): + # Flush the Release History. + # Current Plasma Client Cache will maintain 64-item list. + # If the number changed, this will fail. + print("Start Flush!") + for i in range(64): + ray.get( + [run_on_0.remote(), + run_on_1.remote(), + run_on_2.remote()]) + print("Flush finished!") + + def run_one_test(local_only): + (a, b, c) = create() + # The three objects should be generated on different object stores. + assert ray.get(a) != ray.get(b) + assert ray.get(a) != ray.get(c) + assert ray.get(c) != ray.get(b) + ray.internal.free([a, b, c], local_only=local_only) + flush() + return (a, b, c) + + # Case 1: run this local_only=False. All 3 objects will be deleted. + (a, b, c) = run_one_test(False) + (l1, l2) = ray.wait([a, b, c], timeout=10, num_returns=1) + # All the objects are deleted. + assert len(l1) == 0 + assert len(l2) == 3 + # Case 2: run this local_only=True. Only 1 object will be deleted. + (a, b, c) = run_one_test(True) + (l1, l2) = ray.wait([a, b, c], timeout=10, num_returns=3) + # One object is deleted and 2 objects are not. + assert len(l1) == 2 + assert len(l2) == 1 + # The deleted object will have the same store with the driver. + local_return = ray.worker.global_worker.plasma_client.store_socket_name + for object_id in l1: + assert ray.get(object_id) != local_return + @unittest.skipIf( os.environ.get('RAY_USE_NEW_GCS', False),