Revert inline objects PR (#4125)

* Revert "Inline objects (#3756)"

This reverts commit f987572795.

* fix rebase problems

* more rebase fixes

* add back debug statement
This commit is contained in:
Philipp Moritz
2019-02-22 18:21:01 -08:00
committed by Stephanie Wang
parent f1239a7a63
commit 9b3ce3e64b
18 changed files with 75 additions and 352 deletions
@@ -92,8 +92,6 @@ ray {
// See src/ray/ray_config_def.h for options.
config {
// Maximum size of an inline object (bytes).
inline_object_max_size_bytes: 0
}
}
+1 -57
View File
@@ -1151,14 +1151,8 @@ def test_object_transfer_dump(ray_start_cluster):
cluster = ray_start_cluster
num_nodes = 3
# Set the inline object size to 0 to force all objects to be written to
# plasma.
config = json.dumps({"inline_object_max_size_bytes": 0})
for i in range(num_nodes):
cluster.add_node(
resources={str(i): 1},
object_store_memory=10**9,
_internal_config=config)
cluster.add_node(resources={str(i): 1}, object_store_memory=10**9)
ray.init(redis_address=cluster.redis_address)
@ray.remote
@@ -2659,56 +2653,6 @@ def test_wait_reconstruction(shutdown_only):
assert len(ready_ids) == 1
def test_inline_objects(shutdown_only):
config = json.dumps({"initial_reconstruction_timeout_milliseconds": 200})
ray.init(num_cpus=1, object_store_memory=10**7, _internal_config=config)
@ray.remote
class Actor(object):
def create_inline_object(self):
return "inline"
def create_non_inline_object(self):
return 10000 * [1]
def get(self):
return
a = Actor.remote()
# Count the number of objects that were successfully inlined.
inlined = 0
for _ in range(100):
inline_object = a.create_inline_object.remote()
ray.get(inline_object)
plasma_id = ray.pyarrow.plasma.ObjectID(inline_object.binary())
ray.worker.global_worker.plasma_client.delete([plasma_id])
# Make sure we can still get an inlined object created by an actor even
# after it has been evicted.
try:
value = ray.get(inline_object)
assert value == "inline"
inlined += 1
except ray.exceptions.UnreconstructableError:
pass
# Make sure some objects were inlined. Some of them may not get inlined
# because we evict the object soon after creating it.
assert inlined > 0
# Non-inlined objects are not able to be recreated after eviction.
for _ in range(10):
non_inline_object = a.create_non_inline_object.remote()
ray.get(non_inline_object)
plasma_id = ray.pyarrow.plasma.ObjectID(non_inline_object.binary())
# This while loop is necessary because sometimes the object is still
# there immediately after plasma_client.delete.
while ray.worker.global_worker.plasma_client.contains(plasma_id):
ray.worker.global_worker.plasma_client.delete([plasma_id])
# Objects created by an actor that were evicted and larger than the
# maximum inline object size cannot be retrieved or reconstructed.
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(non_inline_object) == 10000 * [1]
def test_ray_setproctitle(shutdown_only):
ray.init(num_cpus=2)
+6 -1
View File
@@ -210,7 +210,7 @@ def test_actor_broadcast(ray_start_cluster):
def test_object_transfer_retry(ray_start_empty_cluster):
cluster = ray_start_empty_cluster
repeated_push_delay = 10
repeated_push_delay = 4
# Force the sending object manager to allow duplicate pushes again sooner.
# Also, force the receiving object manager to retry the Pull sooner. We
@@ -262,6 +262,11 @@ def test_object_transfer_retry(ray_start_empty_cluster):
ray.worker.global_worker.plasma_client.contains(
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
end_time = time.time()
# Make sure that the first time the objects get transferred, it happens
# quickly.
assert end_time - start_time < repeated_push_delay
# Get the objects again and make sure they get transferred.
xs = ray.get(x_ids)
end_transfer_time = time.time()
-8
View File
@@ -122,18 +122,10 @@ table FunctionTableData {
table ObjectTableData {
// The size of the object.
object_size: long;
// Is object in-lined? Inline objects are objects whose data and metadata are
// inlined in the GCS object table entry, which normally only specifies
// the object location.
inline_object_flag: bool;
// The node manager ID that this object appeared on or was evicted by.
manager: string;
// Whether this entry is an addition or a deletion.
is_eviction: bool;
// In-line object data.
inline_object_data: [ubyte];
// In-line object metadata.
inline_object_metadata: [ubyte];
}
table TaskReconstructionData {
+25 -87
View File
@@ -8,21 +8,15 @@ ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service,
namespace {
/// Process a suffix of the object table log.
/// If object is inlined (inline_object_flag = TRUE), its data and metadata are
/// stored with the object's entry so we read them into inline_object_data, and
/// inline_object_metadata, respectively.
/// If object is not inlined, store the result in client_ids.
/// This assumes that client_ids already contains the result of the
/// object table log up to but not including this suffix.
/// This function also stores a bool in has_been_created indicating whether the
/// object has ever been created before.
/// Process a suffix of the object table log and store the result in
/// client_ids. This assumes that client_ids already contains the result of the
/// object table log up to but not including this suffix. This also stores a
/// bool in has_been_created indicating whether the object has ever been
/// created before.
void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history,
const ray::gcs::ClientTable &client_table,
std::unordered_set<ClientID> *client_ids,
bool *inline_object_flag,
std::vector<uint8_t> *inline_object_data,
std::string *inline_object_metadata, bool *has_been_created) {
bool *has_been_created) {
// location_history contains the history of locations of the object (it is a log),
// which might look like the following:
// client1.is_eviction = false
@@ -30,9 +24,6 @@ void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history
// client2.is_eviction = false
// In such a scenario, we want to indicate client2 is the only client that contains
// the object, which the following code achieves.
//
// If object is inlined each entry contains both the object's data and metadata,
// so we don't care about its location.
if (!location_history.empty()) {
// If there are entries, then the object has been created. Once this flag
// is set to true, it should never go back to false.
@@ -40,35 +31,18 @@ void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history
}
for (const auto &object_table_data : location_history) {
ClientID client_id = ClientID::from_binary(object_table_data.manager);
if (object_table_data.inline_object_flag) {
if (!*inline_object_flag) {
// This is the first time we're receiving the inline object data. Read
// object's data from the GCS entry.
*inline_object_flag = object_table_data.inline_object_flag;
inline_object_data->assign(object_table_data.inline_object_data.begin(),
object_table_data.inline_object_data.end());
inline_object_metadata->assign(object_table_data.inline_object_metadata.begin(),
object_table_data.inline_object_metadata.end());
}
// We got the data and metadata of the object so exit the loop.
break;
}
if (!object_table_data.is_eviction) {
client_ids->insert(client_id);
} else {
client_ids->erase(client_id);
}
}
if (!*inline_object_flag) {
// Filter out the removed clients from the object locations.
for (auto it = client_ids->begin(); it != client_ids->end();) {
if (client_table.IsRemoved(*it)) {
it = client_ids->erase(it);
} else {
it++;
}
// Filter out the removed clients from the object locations.
for (auto it = client_ids->begin(); it != client_ids->end();) {
if (client_table.IsRemoved(*it)) {
it = client_ids->erase(it);
} else {
it++;
}
}
}
@@ -88,8 +62,6 @@ void ObjectDirectory::RegisterBackend() {
// Update entries for this object.
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&it->second.current_object_locations,
&it->second.inline_object_flag, &it->second.inline_object_data,
&it->second.inline_object_metadata,
&it->second.has_been_created);
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
// looping over the callbacks.
@@ -102,8 +74,6 @@ void ObjectDirectory::RegisterBackend() {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, it->second.current_object_locations,
it->second.inline_object_flag, it->second.inline_object_data,
it->second.inline_object_metadata,
it->second.has_been_created);
}
};
@@ -114,25 +84,13 @@ void ObjectDirectory::RegisterBackend() {
ray::Status ObjectDirectory::ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const plasma::ObjectBuffer &plasma_buffer) {
RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id << " inline? "
<< inline_object_flag;
const object_manager::protocol::ObjectInfoT &object_info) {
RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id;
// Append the addition entry to the object table.
auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
data->is_eviction = false;
data->object_size = object_info.data_size;
data->inline_object_flag = inline_object_flag;
if (inline_object_flag) {
// Add object's data to its GCS entry.
data->inline_object_data.assign(
plasma_buffer.data->data(),
plasma_buffer.data->data() + plasma_buffer.data->size());
data->inline_object_metadata.assign(
plasma_buffer.metadata->data(),
plasma_buffer.metadata->data() + plasma_buffer.metadata->size());
}
ray::Status status =
gcs_client_->object_table().Append(JobID::nil(), object_id, data, nullptr);
return status;
@@ -184,19 +142,16 @@ void ObjectDirectory::HandleClientRemoved(const ClientID &client_id) {
if (listener.second.current_object_locations.count(client_id) > 0) {
// If the subscribed object has the removed client as a location, update
// its locations with an empty log so that the location will be removed.
UpdateObjectLocations(
{}, gcs_client_->client_table(), &listener.second.current_object_locations,
&listener.second.inline_object_flag, &listener.second.inline_object_data,
&listener.second.inline_object_metadata, &listener.second.has_been_created);
UpdateObjectLocations({}, gcs_client_->client_table(),
&listener.second.current_object_locations,
&listener.second.has_been_created);
// Re-call all the subscribed callbacks for the object, since its
// locations have changed.
for (const auto &callback_pair : listener.second.callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(
object_id, listener.second.current_object_locations,
listener.second.inline_object_flag, listener.second.inline_object_data,
listener.second.inline_object_metadata, listener.second.has_been_created);
callback_pair.second(object_id, listener.second.current_object_locations,
listener.second.has_been_created);
}
}
}
@@ -222,14 +177,8 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
// immediately notify the caller of the current known locations.
if (listener_state.has_been_created) {
auto &locations = listener_state.current_object_locations;
auto inline_object_flag = listener_state.inline_object_flag;
const auto &inline_object_data = listener_state.inline_object_data;
const auto &inline_object_metadata = listener_state.inline_object_metadata;
io_service_.post([callback, locations, inline_object_flag, inline_object_data,
inline_object_metadata, object_id]() {
callback(object_id, locations, inline_object_flag, inline_object_data,
inline_object_metadata,
/*has_been_created=*/true);
io_service_.post([callback, locations, object_id]() {
callback(object_id, locations, /*has_been_created=*/true);
});
}
return status;
@@ -262,31 +211,20 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_history) {
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> client_ids;
bool inline_object_flag = false;
std::vector<uint8_t> inline_object_data;
std::string inline_object_metadata;
bool has_been_created = false;
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&client_ids, &inline_object_flag, &inline_object_data,
&inline_object_metadata, &has_been_created);
&client_ids, &has_been_created);
// It is safe to call the callback directly since this is already running
// in the GCS client's lookup callback stack.
callback(object_id, client_ids, inline_object_flag, inline_object_data,
inline_object_metadata, has_been_created);
callback(object_id, client_ids, has_been_created);
});
} else {
// If we have locations cached due to a concurrent SubscribeObjectLocations
// call, call the callback immediately with the cached locations.
// If object inlined, we already have the object's data.
auto &locations = it->second.current_object_locations;
bool has_been_created = it->second.has_been_created;
bool inline_object_flag = it->second.inline_object_flag;
const auto &inline_object_data = it->second.inline_object_data;
const auto &inline_object_metadata = it->second.inline_object_metadata;
io_service_.post([callback, object_id, locations, inline_object_flag,
inline_object_data, inline_object_metadata, has_been_created]() {
callback(object_id, locations, inline_object_flag, inline_object_data,
inline_object_metadata, has_been_created);
io_service_.post([callback, object_id, locations, has_been_created]() {
callback(object_id, locations, has_been_created);
});
}
return status;
+7 -22
View File
@@ -50,9 +50,9 @@ class ObjectDirectoryInterface {
virtual std::vector<RemoteConnectionInfo> LookupAllRemoteConnections() const = 0;
/// Callback for object location notifications.
using OnLocationsFound = std::function<void(
const ray::ObjectID &object_id, const std::unordered_set<ray::ClientID> &, bool,
const std::vector<uint8_t> &, const std::string &, bool has_been_created)>;
using OnLocationsFound = std::function<void(const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &,
bool has_been_created)>;
/// Lookup object locations. Callback may be invoked with empty list of client ids.
///
@@ -101,14 +101,10 @@ class ObjectDirectoryInterface {
/// \param object_id The object id that was put into the store.
/// \param client_id The client id corresponding to this node.
/// \param object_info Additional information about the object.
/// \param inline_object_flag Flag specifying whether object is inlined.
/// \param plasma_buffer Object data and metadata from plasma. This data is
/// only valid for inlined objects (i.e., when inline_object_flag=true).
/// \return Status of whether this method succeeded.
virtual ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const plasma::ObjectBuffer &plasma_buffer) = 0;
const object_manager::protocol::ObjectInfoT &object_info) = 0;
/// Report objects removed from this client's store to the object directory.
///
@@ -160,11 +156,9 @@ class ObjectDirectory : public ObjectDirectoryInterface {
ray::Status UnsubscribeObjectLocations(const UniqueID &callback_id,
const ObjectID &object_id) override;
ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info,
bool inline_object_flag,
const plasma::ObjectBuffer &plasma_buffer) override;
ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info) override;
ray::Status ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) override;
@@ -182,15 +176,6 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_map<UniqueID, OnLocationsFound> callbacks;
/// The current set of known locations of this object.
std::unordered_set<ClientID> current_object_locations;
/// Specify whether the object is inlined. The data and the metadata of
/// an inlined object are stored in the object's GCS entry. In this flag
/// (i.e., the object is inlined) the content of current_object_locations
/// can be ignored.
bool inline_object_flag;
/// Inlined object data, if inline_object_flag == true.
std::vector<uint8_t> inline_object_data;
/// Inlined object metadata, if inline_object_flag == true.
std::string inline_object_metadata;
/// This flag will get set to true if the object has ever been created. It
/// should never go back to false once set to true. If this is true, and
/// the current_object_locations is empty, then this means that the object
+9 -80
View File
@@ -10,15 +10,13 @@ namespace ray {
ObjectManager::ObjectManager(asio::io_service &main_service,
const ObjectManagerConfig &config,
std::shared_ptr<ObjectDirectoryInterface> object_directory,
plasma::PlasmaClient &store_client)
std::shared_ptr<ObjectDirectoryInterface> object_directory)
: config_(config),
object_directory_(std::move(object_directory)),
store_notification_(main_service, config_.store_socket_name),
buffer_pool_(config_.store_socket_name, config_.object_chunk_size),
send_work_(send_service_),
receive_work_(receive_service_),
store_client_(store_client),
connection_pool_(),
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {
RAY_CHECK(config_.max_sends > 0);
@@ -69,30 +67,9 @@ void ObjectManager::HandleObjectAdded(
RAY_LOG(DEBUG) << "Object added " << object_id;
RAY_CHECK(local_objects_.count(object_id) == 0);
local_objects_[object_id].object_info = object_info;
// If this object was created from inlined data, this means it is already in GCS,
// so no need to write it again.
if (local_inlined_objects_.find(object_id) == local_inlined_objects_.end()) {
std::vector<uint8_t> inline_object_data;
std::string inline_object_metadata;
bool inline_object_flag = false;
plasma::ObjectBuffer object_buffer;
if (object_info.data_size <= RayConfig::instance().inline_object_max_size_bytes()) {
// Inline object. Try to get the data from the object store.
plasma::ObjectID plasma_id = object_id.to_plasma_id();
RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer));
if (object_buffer.data != nullptr) {
// The object exists. Set inline_object_flag so that the object data
// will be stored in the GCS entry.
inline_object_flag = true;
// Mark this object as inlined, so that if this object is later
// evicted, we do not report it to the GCS.
local_inlined_objects_.insert(object_id);
}
}
ray::Status status =
object_directory_->ReportObjectAdded(object_id, client_id_, object_info);
RAY_CHECK_OK(object_directory_->ReportObjectAdded(object_id, client_id_, object_info,
inline_object_flag, object_buffer));
}
// Handle the unfulfilled_push_requests_ which contains the push request that is not
// completed due to unsatisfied local objects.
auto iter = unfulfilled_push_requests_.find(object_id);
@@ -114,16 +91,10 @@ void ObjectManager::HandleObjectAdded(
}
void ObjectManager::NotifyDirectoryObjectDeleted(const ObjectID &object_id) {
RAY_LOG(DEBUG) << "Object removed " << object_id;
auto it = local_objects_.find(object_id);
RAY_CHECK(it != local_objects_.end());
if (local_inlined_objects_.find(object_id) == local_inlined_objects_.end()) {
// Inline object data can be retrieved by any node by contacting the GCS,
// so only report that the object was evicted if it wasn't inlined.
RAY_CHECK_OK(object_directory_->ReportObjectRemoved(object_id, client_id_));
}
local_objects_.erase(it);
local_inlined_objects_.erase(object_id);
ray::Status status = object_directory_->ReportObjectRemoved(object_id, client_id_);
}
ray::Status ObjectManager::SubscribeObjAdded(
@@ -138,26 +109,6 @@ ray::Status ObjectManager::SubscribeObjDeleted(
return ray::Status::OK();
}
void ObjectManager::PutInlineObject(const ObjectID &object_id,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) {
if (local_objects_.find(object_id) == local_objects_.end()) {
// Inline object is not in the local object store. Create it from
// inline_object_data, and inline_object_metadata, respectively.
//
// Since this function is called on notification or when reading the
// object's entry from GCS, we know this object's entry is already in GCS.
// Remember this by adding the object to local_inlined_objects_. This way
// we avoid writing another copy of this object to GCS in HandleObjectAdded().
local_inlined_objects_.insert(object_id);
auto status = store_client_.CreateAndSeal(
object_id.to_plasma_id(),
std::string(inline_object_data.begin(), inline_object_data.end()),
inline_object_metadata);
RAY_CHECK(status.IsPlasmaObjectExists() || status.ok()) << status.message();
}
}
ray::Status ObjectManager::Pull(const ObjectID &object_id) {
RAY_LOG(DEBUG) << "Pull on " << client_id_ << " of object " << object_id;
// Check if object is already local.
@@ -177,13 +128,7 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) {
return object_directory_->SubscribeObjectLocations(
object_directory_pull_callback_id_, object_id,
[this](const ObjectID &object_id, const std::unordered_set<ClientID> &client_ids,
bool inline_object_flag, const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata, bool created) {
if (inline_object_flag) {
// This is an inlined object. Store it in the Plasma store and return.
PutInlineObject(object_id, inline_object_data, inline_object_metadata);
return;
}
bool created) {
// Exit if the Pull request has already been fulfilled or canceled.
auto it = pull_requests_.find(object_id);
if (it == pull_requests_.end()) {
@@ -635,19 +580,11 @@ ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) {
RAY_RETURN_NOT_OK(object_directory_->LookupLocations(
object_id,
[this, wait_id](const ObjectID &lookup_object_id,
const std::unordered_set<ClientID> &client_ids,
bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata, bool created) {
const std::unordered_set<ClientID> &client_ids, bool created) {
auto &wait_state = active_wait_requests_.find(wait_id)->second;
if (!client_ids.empty() || inline_object_flag) {
if (!client_ids.empty()) {
wait_state.remaining.erase(lookup_object_id);
wait_state.found.insert(lookup_object_id);
if (inline_object_flag) {
// This is an inlined object. Store it in the Plasma store and return.
PutInlineObject(lookup_object_id, inline_object_data,
inline_object_metadata);
}
}
RAY_LOG(DEBUG) << "Wait request " << wait_id << ": " << client_ids.size()
<< " locations found for object " << lookup_object_id;
@@ -681,11 +618,8 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
RAY_CHECK_OK(object_directory_->SubscribeObjectLocations(
wait_id, object_id,
[this, wait_id](const ObjectID &subscribe_object_id,
const std::unordered_set<ClientID> &client_ids,
bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata, bool created) {
if (!client_ids.empty() || inline_object_flag) {
const std::unordered_set<ClientID> &client_ids, bool created) {
if (!client_ids.empty()) {
RAY_LOG(DEBUG) << "Wait request " << wait_id
<< ": subscription notification received for object "
<< subscribe_object_id;
@@ -697,11 +631,6 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
// notification.
return;
}
if (inline_object_flag) {
// This is an inlined object. Store it in the Plasma store.
PutInlineObject(subscribe_object_id, inline_object_data,
inline_object_metadata);
}
auto &wait_state = object_id_wait_state->second;
wait_state.remaining.erase(subscribe_object_id);
wait_state.found.insert(subscribe_object_id);
+1 -20
View File
@@ -76,12 +76,9 @@ class ObjectManager : public ObjectManagerInterface {
/// \param main_service The main asio io_service.
/// \param config ObjectManager configuration.
/// \param object_directory An object implementing the object directory interface.
/// \param store_client Reference to Plasma store. This is used to get and put
/// inlined objects in the local object store.
explicit ObjectManager(boost::asio::io_service &main_service,
const ObjectManagerConfig &config,
std::shared_ptr<ObjectDirectoryInterface> object_directory,
plasma::PlasmaClient &store_client);
std::shared_ptr<ObjectDirectoryInterface> object_directory);
~ObjectManager();
@@ -354,12 +351,6 @@ class ObjectManager : public ObjectManagerInterface {
/// Handle Push task timeout.
void HandlePushTaskTimeout(const ObjectID &object_id, const ClientID &client_id);
/// Add inline object to object store. Called when reading the object entry
/// from GCS or upon receiving a notification about an inline object.
void PutInlineObject(const ObjectID &object_id,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata);
ClientID client_id_;
const ObjectManagerConfig config_;
std::shared_ptr<ObjectDirectoryInterface> object_directory_;
@@ -389,10 +380,6 @@ class ObjectManager : public ObjectManagerInterface {
/// all incoming object transfers.
std::vector<std::thread> receive_threads_;
/// Reference to Plasma Store. This is used to get and put inlined objects in
/// the local object store.
plasma::PlasmaClient &store_client_;
/// Connection pool for reusing outgoing connections to remote object managers.
ConnectionPool connection_pool_;
@@ -400,12 +387,6 @@ class ObjectManager : public ObjectManagerInterface {
/// including when the object was last pushed to other object managers.
std::unordered_map<ObjectID, LocalObjectInfo> local_objects_;
/// Set of objects created from inlined data whose locations and/or evictions
/// should not be reported to the GCS. This includes objects that were
/// created from data retrieved from the GCS, since a GCS entry with the
/// inlined data already exists.
std::unordered_set<ObjectID> local_inlined_objects_;
/// This is used as the callback identifier in Pull for
/// SubscribeObjectLocations. We only need one identifier because we never need to
/// subscribe multiple times to the same object during Pull.
@@ -30,16 +30,13 @@ class MockServer {
public:
MockServer(boost::asio::io_service &main_service,
const ObjectManagerConfig &object_manager_config,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client,
const std::string &store_name)
std::shared_ptr<gcs::AsyncGcsClient> gcs_client)
: object_manager_acceptor_(
main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
object_manager_socket_(main_service),
gcs_client_(gcs_client),
object_manager_(main_service, object_manager_config,
std::make_shared<ObjectDirectory>(main_service, gcs_client_),
store_client_) {
RAY_ARROW_CHECK_OK(store_client_.Connect(store_name.c_str()));
std::make_shared<ObjectDirectory>(main_service, gcs_client_)) {
RAY_CHECK_OK(RegisterGcs(main_service));
// Start listening for clients.
DoAcceptObjectManager();
@@ -91,7 +88,6 @@ class MockServer {
boost::asio::ip::tcp::acceptor object_manager_acceptor_;
boost::asio::ip::tcp::socket object_manager_socket_;
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
plasma::PlasmaClient store_client_;
ObjectManager object_manager_;
};
@@ -146,7 +142,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_1.max_receives = max_receives_a;
om_config_1.object_chunk_size = object_chunk_size;
om_config_1.push_timeout_ms = push_timeout_ms;
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1, store_id_1));
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1));
// start second server
gcs_client_2 = std::shared_ptr<gcs::AsyncGcsClient>(
@@ -158,7 +154,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_2.max_receives = max_receives_b;
om_config_2.object_chunk_size = object_chunk_size;
om_config_2.push_timeout_ms = push_timeout_ms;
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2, store_id_2));
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));
// connect to stores.
RAY_ARROW_CHECK_OK(client1.Connect(store_id_1));
@@ -10,7 +10,6 @@
namespace {
std::string store_executable;
int64_t wait_timeout_ms;
bool test_inline_objects = false;
}
namespace ray {
@@ -25,16 +24,13 @@ class MockServer {
public:
MockServer(boost::asio::io_service &main_service,
const ObjectManagerConfig &object_manager_config,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client,
const std::string &store_name)
std::shared_ptr<gcs::AsyncGcsClient> gcs_client)
: object_manager_acceptor_(
main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
object_manager_socket_(main_service),
gcs_client_(gcs_client),
object_manager_(main_service, object_manager_config,
std::make_shared<ObjectDirectory>(main_service, gcs_client_),
store_client_) {
RAY_ARROW_CHECK_OK(store_client_.Connect(store_name.c_str()));
std::make_shared<ObjectDirectory>(main_service, gcs_client_)) {
RAY_CHECK_OK(RegisterGcs(main_service));
// Start listening for clients.
DoAcceptObjectManager();
@@ -86,7 +82,6 @@ class MockServer {
boost::asio::ip::tcp::acceptor object_manager_acceptor_;
boost::asio::ip::tcp::socket object_manager_socket_;
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
plasma::PlasmaClient store_client_;
ObjectManager object_manager_;
};
@@ -135,7 +130,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_1.max_receives = max_receives;
om_config_1.object_chunk_size = object_chunk_size;
om_config_1.push_timeout_ms = push_timeout_ms;
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1, store_id_1));
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1));
// start second server
gcs_client_2 = std::shared_ptr<gcs::AsyncGcsClient>(
@@ -147,7 +142,7 @@ class TestObjectManagerBase : public ::testing::Test {
om_config_2.max_receives = max_receives;
om_config_2.object_chunk_size = object_chunk_size;
om_config_2.push_timeout_ms = push_timeout_ms;
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2, store_id_2));
server2.reset(new MockServer(main_service, om_config_2, gcs_client_2));
// connect to stores.
RAY_ARROW_CHECK_OK(client1.Connect(store_id_1));
@@ -299,10 +294,8 @@ class TestObjectManager : public TestObjectManagerBase {
sub_id, object_1,
[this, sub_id, object_1, object_2](
const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &clients, bool inline_object_flag,
const std::vector<uint8_t> inline_object_data,
const std::string inline_object_metadata, bool created) {
if (!clients.empty() || inline_object_flag) {
const std::unordered_set<ray::ClientID> &clients, bool created) {
if (!clients.empty()) {
TestWaitWhileSubscribed(sub_id, object_1, object_2);
}
}));
@@ -341,14 +334,7 @@ class TestObjectManager : public TestObjectManagerBase {
}
void NextWaitTest() {
int data_size;
// Set the data size under or over the inline objects limit depending on
// the test configuration.
if (test_inline_objects) {
data_size = RayConfig::instance().inline_object_max_size_bytes() / 2;
} else {
data_size = RayConfig::instance().inline_object_max_size_bytes() * 2;
}
int data_size = 600;
current_wait_test += 1;
switch (current_wait_test) {
case 0: {
@@ -499,9 +485,5 @@ int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
store_executable = std::string(argv[1]);
wait_timeout_ms = std::stoi(std::string(argv[2]));
// If a third argument is provided, then test with inline objects.
if (argc > 3) {
test_inline_objects = true;
}
return RUN_ALL_TESTS();
}
-5
View File
@@ -134,11 +134,6 @@ RAY_CONFIG(int, object_manager_repeated_push_delay_ms, 60000);
/// chunks exceeds the number of available sending threads.
RAY_CONFIG(uint64_t, object_manager_default_chunk_size, 1000000);
/// Maximum size of an inline object (bytes).
/// Inline objects are objects whose data and metadata are inlined in the
/// GCS object table entry, which normally only specifies the object locations.
RAY_CONFIG(int64_t, inline_object_max_size_bytes, 512);
/// Number of workers per process
RAY_CONFIG(int, num_workers_per_process, 1);
+5 -11
View File
@@ -42,12 +42,10 @@ namespace raylet {
NodeManager::NodeManager(boost::asio::io_service &io_service,
const NodeManagerConfig &config, ObjectManager &object_manager,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client,
std::shared_ptr<ObjectDirectoryInterface> object_directory,
plasma::PlasmaClient &store_client)
std::shared_ptr<ObjectDirectoryInterface> object_directory)
: client_id_(gcs_client->client_table().GetLocalClientId()),
io_service_(io_service),
object_manager_(object_manager),
store_client_(store_client),
gcs_client_(std::move(gcs_client)),
object_directory_(std::move(object_directory)),
heartbeat_timer_(io_service),
@@ -92,6 +90,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
}));
RAY_CHECK_OK(object_manager_.SubscribeObjDeleted(
[this](const ObjectID &object_id) { HandleObjectMissing(object_id); }));
RAY_ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str()));
}
ray::Status NodeManager::RegisterGcs() {
@@ -1286,16 +1286,10 @@ void NodeManager::TreatTaskAsFailedIfLost(const Task &task) {
object_id,
[this, task_marked_as_failed, task](
const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &clients, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata, bool has_been_created) {
const std::unordered_set<ray::ClientID> &clients, bool has_been_created) {
if (!*task_marked_as_failed) {
// Only process the object locations if we haven't already marked the
// task as failed.
if (inline_object_flag) {
// If object is inlined, we already have its data and metadata, so return.
return;
}
if (clients.empty() && has_been_created) {
// The object does not exist on any nodes but has been created
// before, so the object has been lost. Mark the task as failed to
@@ -1957,7 +1951,7 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
// Notify the task dependency manager that this object is local.
const auto ready_task_ids = task_dependency_manager_.HandleObjectLocal(object_id);
RAY_LOG(DEBUG) << "Object local " << object_id << ", "
<< "on " << gcs_client_->client_table().GetLocalClientId()
<< " on " << gcs_client_->client_table().GetLocalClientId()
<< ready_task_ids.size() << " tasks ready";
// Transition the tasks whose dependencies are now fulfilled to the ready state.
if (ready_task_ids.size() > 0) {
+2 -4
View File
@@ -56,12 +56,10 @@ class NodeManager {
///
/// \param resource_config The initial set of node resources.
/// \param object_manager A reference to the local object manager.
/// \param reference to the local object store.
NodeManager(boost::asio::io_service &io_service, const NodeManagerConfig &config,
ObjectManager &object_manager,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client,
std::shared_ptr<ObjectDirectoryInterface> object_directory_,
plasma::PlasmaClient &store_client);
std::shared_ptr<ObjectDirectoryInterface> object_directory_);
/// Process a new client connection.
///
@@ -440,7 +438,7 @@ class NodeManager {
/// A Plasma object store client. This is used exclusively for creating new
/// objects in the object store (e.g., for actor tasks that can't be run
/// because the actor died).
plasma::PlasmaClient &store_client_;
plasma::PlasmaClient store_client_;
/// A client connection to the GCS.
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
/// The object table. This is shared with the object manager.
+2 -5
View File
@@ -41,10 +41,9 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
std::shared_ptr<gcs::AsyncGcsClient> gcs_client)
: gcs_client_(gcs_client),
object_directory_(std::make_shared<ObjectDirectory>(main_service, gcs_client_)),
object_manager_(main_service, object_manager_config, object_directory_,
store_client_),
object_manager_(main_service, object_manager_config, object_directory_),
node_manager_(main_service, node_manager_config, object_manager_, gcs_client_,
object_directory_, store_client_),
object_directory_),
socket_name_(socket_name),
acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)),
socket_(main_service),
@@ -57,8 +56,6 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
boost::asio::ip::tcp::v4(),
node_manager_config.node_manager_port)),
node_manager_socket_(main_service) {
RAY_ARROW_CHECK_OK(
store_client_.Connect(node_manager_config.store_socket_name.c_str(), "", 0, 300));
// Start listening for clients.
DoAccept();
DoAcceptObjectManager();
-4
View File
@@ -73,10 +73,6 @@ class Raylet {
/// The object table. This is shared between the object manager and node
/// manager.
std::shared_ptr<ObjectDirectoryInterface> object_directory_;
/// Reference to Plasma Store.
/// A connection to the Plasma Store. This is shared between the node manager
/// and the main thread of the object manager.
plasma::PlasmaClient store_client_;
/// Manages client requests for object transfers and availability.
ObjectManager object_manager_;
/// Manages client requests for task submission and execution.
+2 -4
View File
@@ -145,10 +145,8 @@ void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) {
created_object_id,
[this, task_id, reconstruction_attempt](
const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &clients, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata, bool created) {
if (clients.empty() && !inline_object_flag) {
const std::unordered_set<ray::ClientID> &clients, bool created) {
if (clients.empty()) {
// The required object no longer exists on any live nodes. Attempt
// reconstruction.
AttemptReconstruction(task_id, object_id, reconstruction_attempt, created);
+4 -6
View File
@@ -29,10 +29,10 @@ class MockObjectDirectory : public ObjectDirectoryInterface {
const ObjectID object_id = callback.first;
auto it = locations_.find(object_id);
if (it == locations_.end()) {
callback.second(object_id, std::unordered_set<ray::ClientID>(), false, {}, "",
callback.second(object_id, std::unordered_set<ray::ClientID>(),
/*created=*/false);
} else {
callback.second(object_id, it->second, false, {}, "", /*created=*/true);
callback.second(object_id, it->second, /*created=*/true);
}
}
callbacks_.clear();
@@ -60,11 +60,9 @@ class MockObjectDirectory : public ObjectDirectoryInterface {
const OnLocationsFound &));
MOCK_METHOD2(UnsubscribeObjectLocations,
ray::Status(const ray::UniqueID &, const ObjectID &));
MOCK_METHOD5(ReportObjectAdded,
MOCK_METHOD3(ReportObjectAdded,
ray::Status(const ObjectID &, const ClientID &,
const object_manager::protocol::ObjectInfoT &, bool,
const plasma::ObjectBuffer &));
const object_manager::protocol::ObjectInfoT &));
MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &));
private:
@@ -52,9 +52,6 @@ sleep 1s
# in valgrind.
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 10000
sleep 1s
# Run tests again with inlined objects.
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 10000 true
sleep 1s
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_stress_test $STORE_EXEC
$REDIS_DIR/redis-cli -p 6379 shutdown
sleep 1s