[xray] Implement task lease table, logic for deciding when to reconstruct a task (#2497)

This commit is contained in:
Stephanie Wang
2018-07-30 17:42:28 -04:00
committed by Robert Nishihara
parent 38d00986a5
commit a45f9cfafc
19 changed files with 945 additions and 99 deletions
+1
View File
@@ -179,6 +179,7 @@ install:
- ./src/ray/raylet/worker_pool_test
- ./src/ray/raylet/lineage_cache_test
- ./src/ray/raylet/task_dependency_manager_test
- ./src/ray/raylet/reconstruction_policy_test
- bash ../src/common/test/run_tests.sh
- bash ../src/plasma/test/run_tests.sh
+11
View File
@@ -18,6 +18,10 @@ class RayConfig {
int64_t num_heartbeats_timeout() const { return num_heartbeats_timeout_; }
int64_t initial_reconstruction_timeout_milliseconds() const {
return initial_reconstruction_timeout_milliseconds_;
}
int64_t get_timeout_milliseconds() const { return get_timeout_milliseconds_; }
uint64_t max_lineage_size() const { return max_lineage_size_; }
@@ -111,6 +115,7 @@ class RayConfig {
: ray_protocol_version_(0x0000000000000000),
heartbeat_timeout_milliseconds_(100),
num_heartbeats_timeout_(100),
initial_reconstruction_timeout_milliseconds_(200),
get_timeout_milliseconds_(1000),
worker_get_request_size_(10000),
worker_fetch_request_size_(10000),
@@ -157,6 +162,12 @@ class RayConfig {
/// it as dead to the db_client table.
int64_t num_heartbeats_timeout_;
/// The initial period for a task execution lease. The lease will expire this
/// many milliseconds after the first acquisition of the lease. Nodes that
/// require an object will not try to reconstruct the task until at least
/// this many milliseconds.
int64_t initial_reconstruction_timeout_milliseconds_;
/// These are used by the worker to set timeouts and to batch requests when
/// getting objects.
int64_t get_timeout_milliseconds_;
+3
View File
@@ -15,6 +15,7 @@ AsyncGcsClient::AsyncGcsClient(const ClientID &client_id, CommandType command_ty
task_table_.reset(new TaskTable(context_, this, command_type));
raylet_task_table_.reset(new raylet::TaskTable(context_, this, command_type));
task_reconstruction_log_.reset(new TaskReconstructionLog(context_, this));
task_lease_table_.reset(new TaskLeaseTable(context_, this));
heartbeat_table_.reset(new HeartbeatTable(context_, this));
error_table_.reset(new ErrorTable(primary_context_, this));
profile_table_.reset(new ProfileTable(context_, this));
@@ -75,6 +76,8 @@ TaskReconstructionLog &AsyncGcsClient::task_reconstruction_log() {
return *task_reconstruction_log_;
}
TaskLeaseTable &AsyncGcsClient::task_lease_table() { return *task_lease_table_; }
ClientTable &AsyncGcsClient::client_table() { return *client_table_; }
FunctionTable &AsyncGcsClient::function_table() { return *function_table_; }
+2
View File
@@ -56,6 +56,7 @@ class RAY_EXPORT AsyncGcsClient {
raylet::TaskTable &raylet_task_table();
ActorTable &actor_table();
TaskReconstructionLog &task_reconstruction_log();
TaskLeaseTable &task_lease_table();
ClientTable &client_table();
HeartbeatTable &heartbeat_table();
ErrorTable &error_table();
@@ -80,6 +81,7 @@ class RAY_EXPORT AsyncGcsClient {
std::unique_ptr<raylet::TaskTable> raylet_task_table_;
std::unique_ptr<ActorTable> actor_table_;
std::unique_ptr<TaskReconstructionLog> task_reconstruction_log_;
std::unique_ptr<TaskLeaseTable> task_lease_table_;
std::unique_ptr<HeartbeatTable> heartbeat_table_;
std::unique_ptr<ErrorTable> error_table_;
std::unique_ptr<ProfileTable> profile_table_;
+14
View File
@@ -16,6 +16,7 @@ enum TablePrefix:int {
HEARTBEAT,
ERROR_INFO,
PROFILE,
TASK_LEASE,
}
// The channel that Add operations to the Table should be published on, if any.
@@ -28,6 +29,7 @@ enum TablePubsub:int {
ACTOR,
HEARTBEAT,
ERROR_INFO,
TASK_LEASE,
}
table GcsTableEntry {
@@ -188,3 +190,15 @@ table HeartbeatTableData {
resources_total_label: [string];
resources_total_capacity: [double];
}
// Data for a lease on task execution.
table TaskLeaseData {
// Node manager client ID.
node_manager_id: string;
// The time that the lease was last acquired at. NOTE(swang): This is the
// system clock time according to the node that added the entry and is not
// synchronized with other nodes.
acquired_at: long;
// The period that the lease is active for.
timeout: long;
}
+11 -4
View File
@@ -321,10 +321,12 @@ void ClientTable::HandleNotification(AsyncGcsClient *client,
if (client_added_callback_ != nullptr) {
client_added_callback_(client, client_id, data);
}
RAY_CHECK(removed_clients_.find(client_id) == removed_clients_.end());
} else {
if (client_removed_callback_ != nullptr) {
client_removed_callback_(client, client_id, data);
}
removed_clients_.insert(client_id);
}
}
}
@@ -335,9 +337,13 @@ void ClientTable::HandleConnected(AsyncGcsClient *client, const ClientTableDataT
<< client_id_;
}
const ClientID &ClientTable::GetLocalClientId() { return client_id_; }
const ClientID &ClientTable::GetLocalClientId() const { return client_id_; }
const ClientTableDataT &ClientTable::GetLocalClient() { return local_client_; }
const ClientTableDataT &ClientTable::GetLocalClient() const { return local_client_; }
bool ClientTable::IsRemoved(const ClientID &client_id) const {
return removed_clients_.count(client_id) == 1;
}
Status ClientTable::Connect(const ClientTableDataT &local_client) {
RAY_CHECK(!disconnected_) << "Tried to reconnect a disconnected client.";
@@ -397,7 +403,7 @@ ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) {
return Append(JobID::nil(), client_log_key_, data, nullptr);
}
const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) {
const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) const {
RAY_CHECK(!client_id.is_nil());
auto entry = client_cache_.find(client_id);
if (entry != client_cache_.end()) {
@@ -405,7 +411,7 @@ const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) {
} else {
// If the requested client was not found, return a reference to the nil
// client entry.
return client_cache_[ClientID::nil()];
return client_cache_.at(ClientID::nil());
}
}
@@ -415,6 +421,7 @@ template class Table<TaskID, ray::protocol::Task>;
template class Table<TaskID, TaskTableData>;
template class Log<ActorID, ActorTableData>;
template class Log<TaskID, TaskReconstructionData>;
template class Table<TaskID, TaskLeaseData>;
template class Table<ClientID, HeartbeatTableData>;
template class Log<JobID, ErrorTableData>;
template class Log<UniqueID, ClientTableData>;
+21 -3
View File
@@ -4,6 +4,7 @@
#include <map>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include "ray/constants.h"
#include "ray/id.h"
@@ -346,6 +347,15 @@ class TaskReconstructionLog : public Log<TaskID, TaskReconstructionData> {
}
};
class TaskLeaseTable : public Table<TaskID, TaskLeaseData> {
public:
TaskLeaseTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client)
: Table(context, client) {
pubsub_channel_ = TablePubsub::TASK_LEASE;
prefix_ = TablePrefix::TASK_LEASE;
}
};
namespace raylet {
class TaskTable : public Table<TaskID, ray::protocol::Task> {
@@ -578,17 +588,23 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
/// \param client The client to get information about.
/// \return A reference to the requested client. If the client is not in the
/// cache, then an entry with a nil ClientID will be returned.
const ClientTableDataT &GetClient(const ClientID &client);
const ClientTableDataT &GetClient(const ClientID &client) const;
/// Get the local client's ID.
///
/// \return The local client's ID.
const ClientID &GetLocalClientId();
const ClientID &GetLocalClientId() const;
/// Get the local client's information.
///
/// \return The local client's information.
const ClientTableDataT &GetLocalClient();
const ClientTableDataT &GetLocalClient() const;
/// Check whether the given client is removed.
///
/// \param client_id The ID of the client to check.
/// \return Whether the client with ID client_id is removed.
bool IsRemoved(const ClientID &client_id) const;
private:
/// Handle a client table notification.
@@ -612,6 +628,8 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
ClientTableCallback client_removed_callback_;
/// A cache for information about all clients.
std::unordered_map<ClientID, ClientTableDataT> client_cache_;
/// The set of removed clients.
std::unordered_set<ClientID> removed_clients_;
};
} // namespace gcs
+23 -10
View File
@@ -2,13 +2,15 @@
namespace ray {
ObjectDirectory::ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> &gcs_client) {
gcs_client_ = gcs_client;
}
ObjectDirectory::ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> &gcs_client)
: gcs_client_(gcs_client) {}
namespace {
std::vector<ClientID> UpdateObjectLocations(
std::unordered_set<ClientID> &client_ids,
const std::vector<ObjectTableDataT> &location_history) {
const std::vector<ObjectTableDataT> &location_history,
const ray::gcs::ClientTable &client_table) {
// location_history contains the history of locations of the object (it is a log),
// which might look like the following:
// client1.is_eviction = false
@@ -24,9 +26,19 @@ std::vector<ClientID> UpdateObjectLocations(
client_ids.erase(client_id);
}
}
// Filter out the removed clients from the object locations.
for (auto it = client_ids.begin(); it != client_ids.end();) {
if (client_table.IsRemoved(*it)) {
it = client_ids.erase(it);
} else {
it++;
}
}
return std::vector<ClientID>(client_ids.begin(), client_ids.end());
}
} // namespace
void ObjectDirectory::RegisterBackend() {
auto object_notification_callback = [this](
gcs::AsyncGcsClient *client, const ObjectID &object_id,
@@ -38,8 +50,9 @@ void ObjectDirectory::RegisterBackend() {
return;
}
// Update entries for this object.
std::vector<ClientID> client_id_vec = UpdateObjectLocations(
object_id_listener_pair->second.current_object_locations, location_history);
std::vector<ClientID> client_id_vec =
UpdateObjectLocations(object_id_listener_pair->second.current_object_locations,
location_history, gcs_client_->client_table());
if (!client_id_vec.empty()) {
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
// looping over the callbacks.
@@ -148,12 +161,12 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
JobID job_id = JobID::nil();
ray::Status status = gcs_client_->object_table().Lookup(
job_id, object_id,
[callback](gcs::AsyncGcsClient *client, const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_history) {
[this, callback](gcs::AsyncGcsClient *client, const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_history) {
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> client_ids;
std::vector<ClientID> locations_vector =
UpdateObjectLocations(client_ids, location_history);
std::vector<ClientID> locations_vector = UpdateObjectLocations(
client_ids, location_history, gcs_client_->client_table());
callback(locations_vector, object_id);
});
return status;
+2 -2
View File
@@ -141,10 +141,10 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_set<ClientID> current_object_locations;
};
/// Info about subscribers to object locations.
std::unordered_map<ObjectID, LocationListenerState> listeners_;
/// Reference to the gcs client.
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
/// Info about subscribers to object locations.
std::unordered_map<ObjectID, LocationListenerState> listeners_;
/// Map from object ID to the number of times it's been evicted on this
/// node before.
std::unordered_map<ObjectID, int> object_evictions_;
+1
View File
@@ -37,6 +37,7 @@ ADD_RAY_TEST(worker_pool_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} $
ADD_RAY_TEST(task_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY})
ADD_RAY_TEST(lineage_cache_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY})
ADD_RAY_TEST(task_dependency_manager_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY})
ADD_RAY_TEST(reconstruction_policy_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY})
include_directories(${GCS_FBS_OUTPUT_DIRECTORY})
add_library(rayletlib raylet.cc ${NODE_MANAGER_FBS_OUTPUT_FILES})
+46 -38
View File
@@ -5,7 +5,6 @@
// gen_local_scheduler_fbs from src/ray/CMakeLists.txt.
#include "local_scheduler/format/local_scheduler_generated.h"
#include "ray/raylet/format/node_manager_generated.h"
#include "ray/util/util.h"
namespace {
@@ -91,8 +90,16 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
config.worker_command),
local_queues_(SchedulingQueue()),
scheduling_policy_(local_queues_),
reconstruction_policy_([this](const TaskID &task_id) { ResubmitTask(task_id); }),
task_dependency_manager_(object_manager),
reconstruction_policy_(
io_service_, [this](const TaskID &task_id) { ResubmitTask(task_id); },
RayConfig::instance().initial_reconstruction_timeout_milliseconds(),
gcs_client_->client_table().GetLocalClientId(), gcs_client->task_lease_table(),
std::make_shared<ObjectDirectory>(gcs_client)),
task_dependency_manager_(
object_manager, reconstruction_policy_, io_service,
gcs_client_->client_table().GetLocalClientId(),
RayConfig::instance().initial_reconstruction_timeout_milliseconds(),
gcs_client->task_lease_table()),
lineage_cache_(gcs_client_->client_table().GetLocalClientId(),
gcs_client->raylet_task_table(), gcs_client->raylet_task_table(),
config.max_lineage_size),
@@ -130,6 +137,30 @@ ray::Status NodeManager::RegisterGcs() {
JobID::nil(), gcs_client_->client_table().GetLocalClientId(),
task_committed_callback, nullptr, nullptr));
const auto task_lease_notification_callback = [this](gcs::AsyncGcsClient *client,
const TaskID &task_id,
const TaskLeaseDataT &task_lease) {
const ClientID node_manager_id = ClientID::from_binary(task_lease.node_manager_id);
if (gcs_client_->client_table().IsRemoved(node_manager_id)) {
// The node manager that added the task lease is already removed. The
// lease is considered inactive.
reconstruction_policy_.HandleTaskLeaseNotification(task_id, 0);
} else {
// NOTE(swang): The task_lease.timeout is an overestimate of the lease's
// expiration period since the entry may have been in the GCS for some
// time already. For a more accurate estimate, the age of the entry in
// the GCS should be subtracted from task_lease.timeout.
reconstruction_policy_.HandleTaskLeaseNotification(task_id, task_lease.timeout);
}
};
const auto task_lease_empty_callback = [this](gcs::AsyncGcsClient *client,
const TaskID &task_id) {
reconstruction_policy_.HandleTaskLeaseNotification(task_id, 0);
};
RAY_RETURN_NOT_OK(gcs_client_->task_lease_table().Subscribe(
JobID::nil(), gcs_client_->client_table().GetLocalClientId(),
task_lease_notification_callback, task_lease_empty_callback, nullptr));
// Register a callback for actor creation notifications.
auto actor_creation_callback = [this](
gcs::AsyncGcsClient *client, const ActorID &actor_id,
@@ -212,14 +243,6 @@ void NodeManager::Heartbeat() {
void NodeManager::ClientAdded(const ClientTableDataT &client_data) {
ClientID client_id = ClientID::from_binary(client_data.client_id);
// Make sure the client hasn't already been removed.
if (removed_clients_.find(client_id) != removed_clients_.end()) {
// This client has already been removed, so don't do anything.
RAY_LOG(INFO) << "The client " << client_id << " has already been removed, so it "
<< "can't be added. This is very unusual.";
return;
}
RAY_LOG(DEBUG) << "[ClientAdded] received callback from client id " << client_id;
if (client_id == gcs_client_->client_table().GetLocalClientId()) {
// We got a notification for ourselves, so we are connected to the GCS now.
@@ -258,17 +281,11 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) {
}
void NodeManager::ClientRemoved(const ClientTableDataT &client_data) {
// TODO(swang): If we receive a notification for our own death, clean up and
// exit immediately.
const ClientID client_id = ClientID::from_binary(client_data.client_id);
RAY_LOG(DEBUG) << "[ClientRemoved] received callback from client id " << client_id;
// If the client has already been removed, don't do anything.
if (removed_clients_.find(client_id) != removed_clients_.end()) {
RAY_LOG(INFO) << "The client " << client_id << " has already been removed. This "
<< "should be very unusual.";
return;
}
removed_clients_.insert(client_id);
RAY_CHECK(client_id != gcs_client_->client_table().GetLocalClientId())
<< "Exiting because this node manager has mistakenly been marked dead by the "
<< "monitor.";
@@ -767,10 +784,6 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
bool forwarded) {
// Add the task and its uncommitted lineage to the lineage cache.
lineage_cache_.AddWaitingTask(task, uncommitted_lineage);
// Mark the task as pending. Once the task has finished execution, or once it
// has been forwarded to another node, the task must be marked as canceled in
// the TaskDependencyManager.
task_dependency_manager_.TaskPending(task);
const TaskSpecification &spec = task.GetTaskSpecification();
if (spec.IsActorTask()) {
@@ -791,7 +804,7 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
} else {
// The actor is remote. Forward the task to the node manager that owns
// the actor.
if (removed_clients_.find(node_manager_id) != removed_clients_.end()) {
if (gcs_client_->client_table().IsRemoved(node_manager_id)) {
// The remote node manager is dead, so handle the fact that this actor
// is also dead.
TreatTaskAsFailed(spec);
@@ -824,6 +837,10 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
// Keep the task queued until we discover the actor's location.
// (See design_docs/task_states.rst for the state transition diagram.)
local_queues_.QueueMethodsWaitingForActorCreation({task});
// Mark the task as pending. It will be canceled once we discover the
// actor's location and either execute the task ourselves or forward it
// to another node.
task_dependency_manager_.TaskPending(task);
}
} else {
// This is a non-actor task. Queue the task for a placement decision or for dispatch
@@ -914,20 +931,11 @@ void NodeManager::HandleWorkerUnblocked(std::shared_ptr<Worker> worker) {
worker->MarkUnblocked();
}
void NodeManager::HandleRemoteDependencyRequired(const ObjectID &dependency_id) {
// Try to fetch the object from the object manager.
RAY_CHECK_OK(object_manager_.Pull(dependency_id));
// TODO(swang): Request reconstruction of the object, possibly after a
// timeout.
}
void NodeManager::HandleRemoteDependencyCanceled(const ObjectID &dependency_id) {
// Cancel the fetch request from the object manager.
RAY_CHECK_OK(object_manager_.Cancel(dependency_id));
// TODO(swang): Cancel reconstruction of the object.
}
void NodeManager::EnqueuePlaceableTask(const Task &task) {
// Mark the task as pending. Once the task has finished execution, or once it
// has been forwarded to another node, the task must be marked as canceled in
// the TaskDependencyManager.
task_dependency_manager_.TaskPending(task);
// TODO(atumanov): add task lookup hashmap and change EnqueuePlaceableTask to take
// a vector of TaskIDs. Trigger MoveTask internally.
// Subscribe to the task's dependencies.
@@ -1101,7 +1109,7 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
}
void NodeManager::ResubmitTask(const TaskID &task_id) {
throw std::runtime_error("Method not implemented");
RAY_LOG(WARNING) << "Task re-execution is not currently implemented";
}
void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
-11
View File
@@ -137,14 +137,6 @@ class NodeManager {
/// \return Void.
void CleanUpTasksForDeadActor(const ActorID &actor_id);
/// Methods for managing object dependencies.
/// Handle a dependency required by a queued task that is missing locally.
/// The dependency is (1) on a remote node, (2) pending creation on a remote
/// node, or (3) missing from all nodes and requires reconstruction.
void HandleRemoteDependencyRequired(const ObjectID &dependency_id);
/// Handle a dependency that was previously required by a queued task that is
/// no longer required.
void HandleRemoteDependencyCanceled(const ObjectID &dependency_id);
/// Handle an object becoming local. This updates any local accounting, but
/// does not write to any global accounting in the GCS.
void HandleObjectLocal(const ObjectID &object_id);
@@ -180,9 +172,6 @@ class NodeManager {
/// The lineage cache for the GCS object and task tables.
LineageCache lineage_cache_;
std::vector<ClientID> remote_clients_;
/// A set of all of the remote clients that have been removed. In principle,
/// this could grow unbounded.
std::unordered_set<ClientID> removed_clients_;
std::unordered_map<ClientID, TcpServerConnection> remote_server_connections_;
/// A mapping from actor ID to registration information about that actor
/// (including which node manager owns it).
+163 -2
View File
@@ -4,8 +4,169 @@ namespace ray {
namespace raylet {
void ReconstructionPolicy::CheckObjectReconstruction(const ObjectID &object) {
throw std::runtime_error("Method not implemented");
ReconstructionPolicy::ReconstructionPolicy(
boost::asio::io_service &io_service,
std::function<void(const TaskID &)> reconstruction_handler,
int64_t initial_reconstruction_timeout_ms, const ClientID &client_id,
gcs::PubsubInterface<TaskID> &task_lease_pubsub,
std::shared_ptr<ObjectDirectoryInterface> object_directory)
: io_service_(io_service),
reconstruction_handler_(reconstruction_handler),
initial_reconstruction_timeout_ms_(initial_reconstruction_timeout_ms),
client_id_(client_id),
task_lease_pubsub_(task_lease_pubsub),
object_directory_(std::move(object_directory)) {}
void ReconstructionPolicy::SetTaskTimeout(
std::unordered_map<TaskID, ReconstructionTask>::iterator task_it,
int64_t timeout_ms) {
task_it->second.expires_at = current_time_ms() + timeout_ms;
auto timeout = boost::posix_time::milliseconds(timeout_ms);
task_it->second.reconstruction_timer->expires_from_now(timeout);
const TaskID task_id = task_it->first;
task_it->second.reconstruction_timer->async_wait(
[this, task_id](const boost::system::error_code &error) {
if (!error) {
auto it = listening_tasks_.find(task_id);
if (it == listening_tasks_.end()) {
return;
}
if (it->second.subscribed) {
// If the timer expired and we were subscribed to notifications,
// then this means that we did not receive a task lease
// notification within the lease period. Otherwise, the timer
// would have been reset when the most recent notification was
// received. The current lease is now considered expired.
HandleTaskLeaseExpired(task_id);
} else {
// This task is still required, so subscribe to task lease
// notifications. Reconstruction will be triggered if the current
// task lease expires, or if no one has acquired the task lease.
// NOTE(swang): When reconstruction for a task is first requested,
// we do not initially subscribe to task lease notifications, which
// requires at least one GCS operation. This is in case the objects
// required by the task are no longer needed soon after. If the
// task is still required after this initial period, then we now
// subscribe to task lease notifications.
RAY_CHECK_OK(task_lease_pubsub_.RequestNotifications(JobID::nil(), task_id,
client_id_));
it->second.subscribed = true;
}
} else {
// Check that the error was due to the timer being canceled.
RAY_CHECK(error == boost::asio::error::operation_aborted);
}
});
}
void ReconstructionPolicy::AttemptReconstruction(const TaskID &task_id,
const ObjectID &required_object_id,
int reconstruction_attempt) {
// If we are no longer listening for objects created by this task, give up.
auto it = listening_tasks_.find(task_id);
if (it == listening_tasks_.end()) {
return;
}
// If the object is no longer required, give up.
if (it->second.created_objects.count(required_object_id) == 0) {
return;
}
// Suppress duplicate reconstructions of the same task. This can happen if,
// for example, a task creates two different objects that both require
// reconstruction.
if (reconstruction_attempt != it->second.reconstruction_attempt) {
// Through some other path, reconstruction was already attempted more than
// reconstruction_attempt many times.
return;
}
// Increment the number of times reconstruction has been attempted. This is
// used to suppress duplicate reconstructions of the same task.
it->second.reconstruction_attempt++;
// Reset the timer to wait for task lease notifications again. NOTE(swang):
// The timer should already be set here, but we extend it to give some time
// for the reconstructed task to propagate notifications.
SetTaskTimeout(it, initial_reconstruction_timeout_ms_);
// TODO(swang): Suppress simultaneous attempts to reconstruct the task using
// the task reconstruction log.
reconstruction_handler_(task_id);
}
void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) {
auto it = listening_tasks_.find(task_id);
RAY_CHECK(it != listening_tasks_.end());
int reconstruction_attempt = it->second.reconstruction_attempt;
// Lookup the objects created by this task in the object directory. If any
// objects no longer exist on any live nodes, then reconstruction will be
// attempted asynchronously.
for (const auto &created_object_id : it->second.created_objects) {
RAY_CHECK_OK(object_directory_->LookupLocations(
created_object_id,
[this, task_id, reconstruction_attempt](const std::vector<ray::ClientID> &clients,
const ray::ObjectID &object_id) {
if (clients.empty()) {
// The required object no longer exists on any live nodes. Attempt
// reconstruction.
AttemptReconstruction(task_id, object_id, reconstruction_attempt);
}
}));
}
// Reset the timer to wait for task lease notifications again.
SetTaskTimeout(it, initial_reconstruction_timeout_ms_);
}
void ReconstructionPolicy::HandleTaskLeaseNotification(const TaskID &task_id,
int64_t lease_timeout_ms) {
auto it = listening_tasks_.find(task_id);
if (it == listening_tasks_.end()) {
// We are no longer listening for this task, so ignore the notification.
return;
}
if (lease_timeout_ms == 0) {
HandleTaskLeaseExpired(task_id);
} else if ((current_time_ms() + lease_timeout_ms) > it->second.expires_at) {
// The current lease is longer than the timer's current expiration time.
// Reset the timer according to the current lease.
SetTaskTimeout(it, lease_timeout_ms);
}
}
void ReconstructionPolicy::ListenAndMaybeReconstruct(const ObjectID &object_id) {
TaskID task_id = ComputeTaskId(object_id);
auto it = listening_tasks_.find(task_id);
// Add this object to the list of objects created by the same task.
if (it == listening_tasks_.end()) {
auto inserted = listening_tasks_.emplace(task_id, ReconstructionTask(io_service_));
it = inserted.first;
// Set a timer for the task that created the object. If the lease for that
// task expires, then reconstruction of that task will be triggered.
SetTaskTimeout(it, initial_reconstruction_timeout_ms_);
}
it->second.created_objects.insert(object_id);
}
void ReconstructionPolicy::Cancel(const ObjectID &object_id) {
TaskID task_id = ComputeTaskId(object_id);
auto it = listening_tasks_.find(task_id);
if (it == listening_tasks_.end()) {
// We already stopped listening for this task.
return;
}
it->second.created_objects.erase(object_id);
// If there are no more needed objects created by this task, stop listening
// for notifications.
if (it->second.created_objects.empty()) {
listening_tasks_.erase(it);
// Cancel notifications for the task lease if we were subscribed to them.
if (it->second.subscribed) {
RAY_CHECK_OK(
task_lease_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_));
}
}
}
} // namespace raylet
+109 -9
View File
@@ -2,33 +2,133 @@
#define RAY_RAYLET_RECONSTRUCTION_POLICY_H
#include <functional>
#include <unordered_map>
#include <unordered_set>
#include <boost/asio.hpp>
#include "ray/gcs/tables.h"
#include "ray/id.h"
#include "ray/util/util.h"
#include "ray/object_manager/object_directory.h"
namespace ray {
namespace raylet {
// TODO(swang): Use std::function instead of boost.
class ReconstructionPolicyInterface {
public:
virtual void ListenAndMaybeReconstruct(const ObjectID &object_id) = 0;
virtual void Cancel(const ObjectID &object_id) = 0;
virtual ~ReconstructionPolicyInterface(){};
};
class ReconstructionPolicy {
class ReconstructionPolicy : public ReconstructionPolicyInterface {
public:
/// Create the reconstruction policy.
///
/// \param io_service The event loop to attach reconstruction timers to.
/// \param reconstruction_handler The handler to call if a task needs to be
/// re-executed.
// TODO(swang): This requires at minimum references to the Raylet's lineage
// cache and GCS client.
ReconstructionPolicy(std::function<void(const TaskID &)> reconstruction_handler) {}
/// \param initial_reconstruction_timeout_ms The initial timeout within which
/// a task lease notification must be received. Otherwise, reconstruction
/// will be triggered.
/// \param client_id The client ID to use when requesting notifications from
/// the GCS.
/// \param task_lease_pubsub The GCS pub-sub storage system to request task
/// lease notifications from.
ReconstructionPolicy(boost::asio::io_service &io_service,
std::function<void(const TaskID &)> reconstruction_handler,
int64_t initial_reconstruction_timeout_ms,
const ClientID &client_id,
gcs::PubsubInterface<TaskID> &task_lease_pubsub,
std::shared_ptr<ObjectDirectoryInterface> object_directory);
/// Check whether an object requires reconstruction. If this object requires
/// reconstruction, the registered task reconstruction handler will be called
/// for each task that needs to be re-executed.
/// Listen for task lease notifications about an object that may require
/// reconstruction. If no notifications are received within the initial
/// timeout, then the registered task reconstruction handler will be called
/// for the task that created the object.
///
/// \param object_id The object to check for reconstruction.
void CheckObjectReconstruction(const ObjectID &object_id);
void ListenAndMaybeReconstruct(const ObjectID &object_id);
/// Cancel listening for an object. Notifications for the object will be
/// ignored. This does not cancel a reconstruction attempt that is already in
/// progress.
///
/// \param object_id The object to cancel.
void Cancel(const ObjectID &object_id);
/// Handle a notification for a task lease. This handler should be called to
/// indicate that a task is currently being executed, so any objects that it
/// creates should not be reconstructed.
///
/// \param task_id The task ID of the task being executed.
/// \param lease_timeout_ms After this timeout, the task's lease is
/// guaranteed to be expired. If a second notification is not received within
/// this timeout, then objects that the task creates may be reconstructed.
void HandleTaskLeaseNotification(const TaskID &task_id, int64_t lease_timeout_ms);
private:
struct ReconstructionTask {
ReconstructionTask(boost::asio::io_service &io_service)
: expires_at(INT64_MAX),
subscribed(false),
reconstruction_attempt(0),
reconstruction_timer(new boost::asio::deadline_timer(io_service)) {}
// The objects created by this task that we are listening for notifications for.
std::unordered_set<ObjectID> created_objects;
// The time at which the timer for this task expires, according to this
// node's steady clock.
int64_t expires_at;
// Whether we are subscribed to lease notifications for this task.
bool subscribed;
// The number of times we've attempted reconstructing this task so far.
int reconstruction_attempt;
// The task's reconstruction timer. If this expires before a lease
// notification is received, then the task will be reconstructed.
std::unique_ptr<boost::asio::deadline_timer> reconstruction_timer;
};
/// Set the reconstruction timer for a task. If no task lease notifications
/// are received within the timeout, then reconstruction will be triggered.
/// If the timer was previously set, this method will cancel it and reset the
/// timer to the new timeout.
void SetTaskTimeout(std::unordered_map<TaskID, ReconstructionTask>::iterator task_it,
int64_t timeout_ms);
/// Attempt to re-execute a task to reconstruct the required object.
///
/// \param task_id The task to attempt to re-execute.
/// \param required_object_id The object created by the task that requires
/// reconstruction.
/// \param reconstruction_attempt What number attempt this is at
/// reconstructing the task. This is used to suppress duplicate
/// reconstructions of the same task (e.g., if a task creates two objects
/// that both require reconstruction).
void AttemptReconstruction(const TaskID &task_id, const ObjectID &required_object_id,
int reconstruction_attempt);
/// Handle expiration of a task lease.
void HandleTaskLeaseExpired(const TaskID &task_id);
/// The event loop.
boost::asio::io_service &io_service_;
/// The handler to call for tasks that require reconstruction.
const std::function<void(const TaskID &)> reconstruction_handler_;
/// The initial timeout within which a task lease notification must be
/// received. Otherwise, reconstruction will be triggered.
const int64_t initial_reconstruction_timeout_ms_;
/// The client ID to use when requesting notifications from the GCS.
const ClientID client_id_;
/// The GCS pub-sub storage system to request task lease notifications from.
gcs::PubsubInterface<TaskID> &task_lease_pubsub_;
/// The object directory used to lookup object locations.
std::shared_ptr<ObjectDirectoryInterface> object_directory_;
/// The tasks that we are currently subscribed to in the GCS.
std::unordered_map<TaskID, ReconstructionTask> listening_tasks_;
};
} // namespace raylet
@@ -0,0 +1,330 @@
#include <list>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <boost/asio.hpp>
#include "ray/raylet/format/node_manager_generated.h"
#include "ray/raylet/reconstruction_policy.h"
#include "ray/object_manager/object_directory.h"
namespace ray {
namespace raylet {
class MockObjectDirectory : public ObjectDirectoryInterface {
public:
MockObjectDirectory() {}
ray::Status LookupLocations(const ObjectID &object_id,
const OnLocationsFound &callback) {
callbacks_.push_back({object_id, callback});
return ray::Status::OK();
}
void FlushCallbacks() {
for (const auto &callback : callbacks_) {
const ObjectID object_id = callback.first;
callback.second(locations_[object_id], object_id);
}
callbacks_.clear();
}
void SetObjectLocations(const ObjectID &object_id, std::vector<ClientID> locations) {
locations_[object_id] = locations;
}
MOCK_METHOD0(RegisterBackend, void(void));
MOCK_METHOD3(GetInformation, ray::Status(const ClientID &, const InfoSuccessCallback &,
const InfoFailureCallback &));
MOCK_METHOD3(SubscribeObjectLocations,
ray::Status(const ray::UniqueID &, const ObjectID &,
const OnLocationsFound &));
MOCK_METHOD2(UnsubscribeObjectLocations,
ray::Status(const ray::UniqueID &, const ObjectID &));
MOCK_METHOD3(ReportObjectAdded,
ray::Status(const ObjectID &, const ClientID &, const ObjectInfoT &));
MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &));
private:
std::vector<std::pair<ObjectID, OnLocationsFound>> callbacks_;
std::unordered_map<ObjectID, std::vector<ClientID>> locations_;
};
class MockGcs : public gcs::PubsubInterface<TaskID> {
public:
MockGcs() : notification_callback_(nullptr), failure_callback_(nullptr){};
void Subscribe(const gcs::TaskLeaseTable::WriteCallback &notification_callback,
const gcs::TaskLeaseTable::FailureCallback &failure_callback) {
notification_callback_ = notification_callback;
failure_callback_ = failure_callback;
}
void Add(const JobID &job_id, const TaskID &task_id,
std::shared_ptr<TaskLeaseDataT> &task_lease_data) {
task_lease_table_[task_id] = task_lease_data;
if (subscribed_tasks_.count(task_id) == 1) {
notification_callback_(nullptr, task_id, *task_lease_data);
}
}
Status RequestNotifications(const JobID &job_id, const TaskID &task_id,
const ClientID &client_id) {
subscribed_tasks_.insert(task_id);
auto entry = task_lease_table_.find(task_id);
if (entry == task_lease_table_.end()) {
failure_callback_(nullptr, task_id);
} else {
notification_callback_(nullptr, task_id, *entry->second);
}
return ray::Status::OK();
}
Status CancelNotifications(const JobID &job_id, const TaskID &task_id,
const ClientID &client_id) {
subscribed_tasks_.erase(task_id);
return ray::Status::OK();
}
private:
gcs::TaskLeaseTable::WriteCallback notification_callback_;
gcs::TaskLeaseTable::FailureCallback failure_callback_;
std::unordered_map<TaskID, std::shared_ptr<TaskLeaseDataT>> task_lease_table_;
std::unordered_set<TaskID> subscribed_tasks_;
};
class ReconstructionPolicyTest : public ::testing::Test {
public:
ReconstructionPolicyTest()
: io_service_(),
mock_gcs_(),
mock_object_directory_(std::make_shared<MockObjectDirectory>()),
reconstruction_timeout_ms_(50),
reconstruction_policy_(std::make_shared<ReconstructionPolicy>(
io_service_,
[this](const TaskID &task_id) { TriggerReconstruction(task_id); },
reconstruction_timeout_ms_, ClientID::from_random(), mock_gcs_,
mock_object_directory_)),
timer_canceled_(false) {
mock_gcs_.Subscribe(
[this](gcs::AsyncGcsClient *client, const TaskID &task_id,
const TaskLeaseDataT &task_lease) {
reconstruction_policy_->HandleTaskLeaseNotification(task_id,
task_lease.timeout);
},
[this](gcs::AsyncGcsClient *client, const TaskID &task_id) {
reconstruction_policy_->HandleTaskLeaseNotification(task_id, 0);
});
}
void TriggerReconstruction(const TaskID &task_id) { reconstructed_tasks_[task_id]++; }
void Tick(const std::function<void(void)> &handler,
std::shared_ptr<boost::asio::deadline_timer> timer,
boost::posix_time::milliseconds timer_period,
const boost::system::error_code &error) {
if (timer_canceled_) {
return;
}
ASSERT_FALSE(error);
handler();
// Fire the timer again after another period.
timer->expires_from_now(timer_period);
timer->async_wait(
[this, handler, timer, timer_period](const boost::system::error_code &error) {
Tick(handler, timer, timer_period, error);
});
}
void SetPeriodicTimer(uint64_t period_ms, const std::function<void(void)> &handler) {
timer_canceled_ = false;
auto timer_period = boost::posix_time::milliseconds(period_ms);
auto timer = std::make_shared<boost::asio::deadline_timer>(io_service_, timer_period);
timer->async_wait(
[this, handler, timer, timer_period](const boost::system::error_code &error) {
Tick(handler, timer, timer_period, error);
});
}
void CancelPeriodicTimer() { timer_canceled_ = true; }
void Run(uint64_t reconstruction_timeout_ms) {
auto timer_period = boost::posix_time::milliseconds(reconstruction_timeout_ms);
auto timer = std::make_shared<boost::asio::deadline_timer>(io_service_, timer_period);
timer->async_wait([this, timer](const boost::system::error_code &error) {
ASSERT_FALSE(error);
io_service_.stop();
});
io_service_.run();
io_service_.reset();
mock_object_directory_->FlushCallbacks();
}
protected:
boost::asio::io_service io_service_;
MockGcs mock_gcs_;
std::shared_ptr<MockObjectDirectory> mock_object_directory_;
uint64_t reconstruction_timeout_ms_;
std::shared_ptr<ReconstructionPolicy> reconstruction_policy_;
bool timer_canceled_;
std::unordered_map<TaskID, int> reconstructed_tasks_;
};
TEST_F(ReconstructionPolicyTest, TestReconstructionSimple) {
TaskID task_id = TaskID::from_random();
task_id = FinishTaskId(task_id);
ObjectID object_id = ComputeReturnId(task_id, 1);
// Listen for an object.
reconstruction_policy_->ListenAndMaybeReconstruct(object_id);
// Run the test for longer than the reconstruction timeout.
Run(reconstruction_timeout_ms_ * 1.1);
// Check that reconstruction was triggered for the task that created the
// object.
ASSERT_EQ(reconstructed_tasks_[task_id], 1);
// Run the test again.
Run(reconstruction_timeout_ms_ * 1.1);
// Check that reconstruction was triggered again.
ASSERT_EQ(reconstructed_tasks_[task_id], 2);
}
TEST_F(ReconstructionPolicyTest, TestReconstructionEvicted) {
TaskID task_id = TaskID::from_random();
task_id = FinishTaskId(task_id);
ObjectID object_id = ComputeReturnId(task_id, 1);
mock_object_directory_->SetObjectLocations(object_id, {ClientID::from_random()});
// Listen for both objects.
reconstruction_policy_->ListenAndMaybeReconstruct(object_id);
// Run the test for longer than the reconstruction timeout.
Run(reconstruction_timeout_ms_ * 1.1);
// Check that reconstruction was not triggered, since the objects still
// exist on a live node.
ASSERT_EQ(reconstructed_tasks_[task_id], 0);
// Simulate evicting one of the objects.
mock_object_directory_->SetObjectLocations(object_id, {});
// Run the test again.
Run(reconstruction_timeout_ms_ * 1.1);
// Check that reconstruction was triggered, since one of the objects was
// evicted.
ASSERT_EQ(reconstructed_tasks_[task_id], 1);
}
TEST_F(ReconstructionPolicyTest, TestDuplicateReconstruction) {
// Create two object IDs produced by the same task.
TaskID task_id = TaskID::from_random();
task_id = FinishTaskId(task_id);
ObjectID object_id1 = ComputeReturnId(task_id, 1);
ObjectID object_id2 = ComputeReturnId(task_id, 2);
// Listen for both objects.
reconstruction_policy_->ListenAndMaybeReconstruct(object_id1);
reconstruction_policy_->ListenAndMaybeReconstruct(object_id2);
// Run the test for longer than the reconstruction timeout.
Run(reconstruction_timeout_ms_ * 1.1);
// Check that reconstruction is only triggered once for the task that created
// both objects.
ASSERT_EQ(reconstructed_tasks_[task_id], 1);
// Run the test again.
Run(reconstruction_timeout_ms_ * 1.1);
// Check that reconstruction is again only triggered once.
ASSERT_EQ(reconstructed_tasks_[task_id], 2);
}
TEST_F(ReconstructionPolicyTest, TestReconstructionSuppressed) {
TaskID task_id = TaskID::from_random();
task_id = FinishTaskId(task_id);
ObjectID object_id = ComputeReturnId(task_id, 1);
// Run the test for much longer than the reconstruction timeout.
int64_t test_period = 2 * reconstruction_timeout_ms_;
// Acquire the task lease for a period longer than the test period.
auto task_lease_data = std::make_shared<TaskLeaseDataT>();
task_lease_data->node_manager_id = ClientID::from_random().hex();
task_lease_data->acquired_at = current_sys_time_ms();
task_lease_data->timeout = 2 * test_period;
mock_gcs_.Add(DriverID::nil(), task_id, task_lease_data);
// Listen for an object.
reconstruction_policy_->ListenAndMaybeReconstruct(object_id);
// Run the test.
Run(test_period);
// Check that reconstruction is suppressed by the active task lease.
ASSERT_TRUE(reconstructed_tasks_.empty());
// Run the test again past the expiration time of the lease.
Run(task_lease_data->timeout * 1.1);
// Check that this time, reconstruction is triggered.
ASSERT_EQ(reconstructed_tasks_[task_id], 1);
}
TEST_F(ReconstructionPolicyTest, TestReconstructionContinuallySuppressed) {
TaskID task_id = TaskID::from_random();
task_id = FinishTaskId(task_id);
ObjectID object_id = ComputeReturnId(task_id, 1);
// Listen for an object.
reconstruction_policy_->ListenAndMaybeReconstruct(object_id);
// Send the reconstruction manager heartbeats about the object.
SetPeriodicTimer(reconstruction_timeout_ms_ / 2, [this, task_id]() {
auto task_lease_data = std::make_shared<TaskLeaseDataT>();
task_lease_data->node_manager_id = ClientID::from_random().hex();
task_lease_data->acquired_at = current_sys_time_ms();
task_lease_data->timeout = reconstruction_timeout_ms_;
mock_gcs_.Add(DriverID::nil(), task_id, task_lease_data);
});
// Run the test for much longer than the reconstruction timeout.
Run(reconstruction_timeout_ms_ * 2);
// Check that reconstruction is suppressed.
ASSERT_TRUE(reconstructed_tasks_.empty());
// Cancel the heartbeats to the reconstruction manager.
CancelPeriodicTimer();
// Run the test again.
Run(reconstruction_timeout_ms_ * 1.1);
// Check that this time, reconstruction is triggered.
ASSERT_EQ(reconstructed_tasks_[task_id], 1);
}
TEST_F(ReconstructionPolicyTest, TestReconstructionCanceled) {
TaskID task_id = TaskID::from_random();
task_id = FinishTaskId(task_id);
ObjectID object_id = ComputeReturnId(task_id, 1);
// Listen for an object.
reconstruction_policy_->ListenAndMaybeReconstruct(object_id);
// Halfway through the reconstruction timeout, cancel the object
// reconstruction.
auto timer_period = boost::posix_time::milliseconds(reconstruction_timeout_ms_);
auto timer = std::make_shared<boost::asio::deadline_timer>(io_service_, timer_period);
timer->async_wait([this, timer, object_id](const boost::system::error_code &error) {
ASSERT_FALSE(error);
reconstruction_policy_->Cancel(object_id);
});
Run(reconstruction_timeout_ms_ * 2);
// Check that reconstruction is suppressed.
ASSERT_TRUE(reconstructed_tasks_.empty());
// Listen for the object again.
reconstruction_policy_->ListenAndMaybeReconstruct(object_id);
// Run the test again.
Run(reconstruction_timeout_ms_ * 1.1);
// Check that this time, reconstruction is triggered.
ASSERT_EQ(reconstructed_tasks_[task_id], 1);
}
} // namespace raylet
} // namespace ray
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
+73 -13
View File
@@ -4,8 +4,18 @@ namespace ray {
namespace raylet {
TaskDependencyManager::TaskDependencyManager(ObjectManagerInterface &object_manager)
: object_manager_(object_manager) {}
TaskDependencyManager::TaskDependencyManager(
ObjectManagerInterface &object_manager,
ReconstructionPolicyInterface &reconstruction_policy,
boost::asio::io_service &io_service, const ClientID &client_id,
int64_t initial_lease_period_ms,
gcs::TableInterface<TaskID, TaskLeaseData> &task_lease_table)
: object_manager_(object_manager),
reconstruction_policy_(reconstruction_policy),
io_service_(io_service),
client_id_(client_id),
initial_lease_period_ms_(initial_lease_period_ms),
task_lease_table_(task_lease_table) {}
bool TaskDependencyManager::CheckObjectLocal(const ObjectID &object_id) const {
return local_objects_.count(object_id) == 1;
@@ -44,6 +54,7 @@ void TaskDependencyManager::HandleRemoteDependencyRequired(const ObjectID &objec
// If we haven't already, request the object manager to pull it from a
// remote node.
RAY_CHECK_OK(object_manager_.Pull(object_id));
reconstruction_policy_.ListenAndMaybeReconstruct(object_id);
}
}
}
@@ -55,6 +66,7 @@ void TaskDependencyManager::HandleRemoteDependencyCanceled(const ObjectID &objec
auto it = required_objects_.find(object_id);
if (it != required_objects_.end()) {
RAY_CHECK_OK(object_manager_.Cancel(object_id));
reconstruction_policy_.Cancel(object_id);
required_objects_.erase(it);
}
}
@@ -196,23 +208,71 @@ void TaskDependencyManager::TaskPending(const Task &task) {
TaskID task_id = task.GetTaskSpecification().TaskId();
// Record that the task is pending execution.
pending_tasks_.insert(task_id);
// Find any subscribed tasks that are dependent on objects created by the
// pending task.
auto remote_task_entry = required_tasks_.find(task_id);
if (remote_task_entry != required_tasks_.end()) {
for (const auto &object_entry : remote_task_entry->second) {
// This object created by the pending task will appear locally once the
// task completes execution. Cancel any in-progress operations to make
// the object local.
HandleRemoteDependencyCanceled(object_entry.first);
auto inserted =
pending_tasks_.emplace(task_id, PendingTask(initial_lease_period_ms_, io_service_));
if (inserted.second) {
// This is the first time we've heard that this task is pending. Find any
// subscribed tasks that are dependent on objects created by the pending
// task.
auto remote_task_entry = required_tasks_.find(task_id);
if (remote_task_entry != required_tasks_.end()) {
for (const auto &object_entry : remote_task_entry->second) {
// This object created by the pending task will appear locally once the
// task completes execution. Cancel any in-progress operations to make
// the object local.
HandleRemoteDependencyCanceled(object_entry.first);
}
}
// Acquire the lease for the task's execution in the global lease table.
AcquireTaskLease(task_id);
}
}
void TaskDependencyManager::AcquireTaskLease(const TaskID &task_id) {
auto it = pending_tasks_.find(task_id);
int64_t now_ms = current_time_ms();
if (it == pending_tasks_.end()) {
return;
}
// Check that we were able to renew the task lease before the previous one
// expired.
if (now_ms > it->second.expires_at) {
RAY_LOG(WARNING) << "Task lease to renew has already expired by "
<< (it->second.expires_at - now_ms) << "ms";
}
auto task_lease_data = std::make_shared<TaskLeaseDataT>();
task_lease_data->node_manager_id = client_id_.hex();
task_lease_data->acquired_at = current_sys_time_ms();
task_lease_data->timeout = it->second.lease_period;
RAY_CHECK_OK(task_lease_table_.Add(DriverID::nil(), task_id, task_lease_data, nullptr));
auto period = boost::posix_time::milliseconds(it->second.lease_period / 2);
it->second.lease_timer->expires_from_now(period);
it->second.lease_timer->async_wait(
[this, task_id](const boost::system::error_code &error) {
if (!error) {
AcquireTaskLease(task_id);
} else {
// Check that the error was due to the timer being canceled.
RAY_CHECK(error == boost::asio::error::operation_aborted);
}
});
it->second.expires_at = now_ms + it->second.lease_period;
it->second.lease_period *= 2;
}
void TaskDependencyManager::TaskCanceled(const TaskID &task_id) {
// Record that the task is no longer pending execution.
pending_tasks_.erase(task_id);
auto it = pending_tasks_.find(task_id);
if (it == pending_tasks_.end()) {
return;
}
pending_tasks_.erase(it);
// Find any subscribed tasks that are dependent on objects created by the
// canceled task.
auto remote_task_entry = required_tasks_.find(task_id);
+43 -2
View File
@@ -6,6 +6,7 @@
#include "ray/raylet/task.h"
#include "ray/object_manager/object_manager.h"
#include "ray/raylet/reconstruction_policy.h"
#include "ray/util/util.h"
// clang-format on
namespace ray {
@@ -27,7 +28,11 @@ class ReconstructionPolicy;
class TaskDependencyManager {
public:
/// Create a task dependency manager.
TaskDependencyManager(ObjectManagerInterface &object_manager);
TaskDependencyManager(ObjectManagerInterface &object_manager,
ReconstructionPolicyInterface &reconstruction_policy,
boost::asio::io_service &io_service, const ClientID &client_id,
int64_t initial_lease_period_ms,
gcs::TableInterface<TaskID, TaskLeaseData> &task_lease_table);
/// Check whether an object is locally available.
///
@@ -107,6 +112,21 @@ class TaskDependencyManager {
int64_t num_missing_dependencies;
};
struct PendingTask {
PendingTask(int64_t initial_lease_period_ms, boost::asio::io_service &io_service)
: lease_period(initial_lease_period_ms),
expires_at(INT64_MAX),
lease_timer(new boost::asio::deadline_timer(io_service)) {}
/// The timeout within which the lease should be renewed.
int64_t lease_period;
/// The time at which the current lease will expire, according to this
/// node's steady clock.
int64_t expires_at;
/// A timer used to determine when to next renew the lease.
std::unique_ptr<boost::asio::deadline_timer> lease_timer;
};
/// Check whether the given object needs to be made available through object
/// transfer or reconstruction. These are objects for which: (1) there is a
/// subscribed task dependent on it, (2) the object is not local, and (3) the
@@ -119,8 +139,29 @@ class TaskDependencyManager {
/// operations to make the object available through object transfer or
/// reconstruction.
void HandleRemoteDependencyCanceled(const ObjectID &object_id);
/// Acquire the task lease in the GCS for the given task. This is used to
/// indicate to other nodes that the task is currently pending on this node.
/// The task lease has an expiration time. If we do not renew the lease
/// before that time, then other nodes may choose to execute the task.
void AcquireTaskLease(const TaskID &task_id);
/// The object manager, used to fetch required objects from remote nodes.
ObjectManagerInterface &object_manager_;
/// The reconstruction policy, used to reconstruct required objects that no
/// longer exist on any live nodes.
ReconstructionPolicyInterface &reconstruction_policy_;
/// The event loop, used to set timers for renewing task leases. The task
/// leases are used to indicate which tasks are pending execution on this
/// node and must be periodically renewed.
boost::asio::io_service &io_service_;
/// This node's GCS client ID, used in the task lease information.
const ClientID client_id_;
/// For a given task, the expiration period of the initial task lease that is
/// added to the GCS. The lease expiration period is doubled every time the
/// lease is renewed.
const int64_t initial_lease_period_ms_;
/// The storage system for the task lease table.
gcs::TableInterface<TaskID, TaskLeaseData> &task_lease_table_;
/// A mapping from task ID of each subscribed task to its list of object
/// dependencies.
std::unordered_map<ray::TaskID, TaskDependencies> task_dependencies_;
@@ -137,7 +178,7 @@ class TaskDependencyManager {
std::unordered_set<ray::ObjectID> local_objects_;
/// The set of tasks that are pending execution. Any objects created by these
/// tasks that are not already local are pending creation.
std::unordered_set<ray::TaskID> pending_tasks_;
std::unordered_map<ray::TaskID, PendingTask> pending_tasks_;
};
} // namespace raylet
+79 -2
View File
@@ -19,13 +19,50 @@ class MockObjectManager : public ObjectManagerInterface {
MOCK_METHOD1(Cancel, ray::Status(const ObjectID &object_id));
};
class MockReconstructionPolicy : public ReconstructionPolicyInterface {
public:
MOCK_METHOD1(ListenAndMaybeReconstruct, void(const ObjectID &object_id));
MOCK_METHOD1(Cancel, void(const ObjectID &object_id));
};
class MockGcs : public gcs::TableInterface<TaskID, TaskLeaseData> {
public:
MOCK_METHOD4(
Add,
ray::Status(const JobID &job_id, const TaskID &task_id,
std::shared_ptr<TaskLeaseDataT> &task_data,
const gcs::TableInterface<TaskID, TaskLeaseData>::WriteCallback &done));
};
class TaskDependencyManagerTest : public ::testing::Test {
public:
TaskDependencyManagerTest()
: object_manager_mock_(), task_dependency_manager_(object_manager_mock_) {}
: object_manager_mock_(),
reconstruction_policy_mock_(),
io_service_(),
gcs_mock_(),
initial_lease_period_ms_(100),
task_dependency_manager_(object_manager_mock_, reconstruction_policy_mock_,
io_service_, ClientID::nil(), initial_lease_period_ms_,
gcs_mock_) {}
void Run(uint64_t timeout_ms) {
auto timer_period = boost::posix_time::milliseconds(timeout_ms);
auto timer = std::make_shared<boost::asio::deadline_timer>(io_service_, timer_period);
timer->async_wait([this](const boost::system::error_code &error) {
ASSERT_FALSE(error);
io_service_.stop();
});
io_service_.run();
io_service_.reset();
}
protected:
MockObjectManager object_manager_mock_;
MockReconstructionPolicy reconstruction_policy_mock_;
boost::asio::io_service io_service_;
MockGcs gcs_mock_;
int64_t initial_lease_period_ms_;
TaskDependencyManager task_dependency_manager_;
};
@@ -74,6 +111,7 @@ TEST_F(TaskDependencyManagerTest, TestSimpleTask) {
// arguments should be remote.
for (const auto &argument_id : arguments) {
EXPECT_CALL(object_manager_mock_, Pull(argument_id));
EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id));
}
// Subscribe to the task's dependencies.
bool ready = task_dependency_manager_.SubscribeDependencies(task_id, arguments);
@@ -82,6 +120,7 @@ TEST_F(TaskDependencyManagerTest, TestSimpleTask) {
// All arguments should be canceled as they become available locally.
for (const auto &argument_id : arguments) {
EXPECT_CALL(object_manager_mock_, Cancel(argument_id));
EXPECT_CALL(reconstruction_policy_mock_, Cancel(argument_id));
}
// For each argument except the last, tell the task dependency manager that
// the argument is local.
@@ -110,6 +149,7 @@ TEST_F(TaskDependencyManagerTest, TestDuplicateSubscribe) {
// duplicates of previous subscription calls. Each argument should only be
// requested from the node manager once.
EXPECT_CALL(object_manager_mock_, Pull(argument_id));
EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id));
bool ready = task_dependency_manager_.SubscribeDependencies(task_id, arguments);
ASSERT_FALSE(ready);
}
@@ -117,6 +157,7 @@ TEST_F(TaskDependencyManagerTest, TestDuplicateSubscribe) {
// All arguments should be canceled as they become available locally.
for (const auto &argument_id : arguments) {
EXPECT_CALL(object_manager_mock_, Cancel(argument_id));
EXPECT_CALL(reconstruction_policy_mock_, Cancel(argument_id));
}
// For each argument except the last, tell the task dependency manager that
// the argument is local.
@@ -140,6 +181,7 @@ TEST_F(TaskDependencyManagerTest, TestMultipleTasks) {
// The object should only be requested from the object manager once for all
// three tasks.
EXPECT_CALL(object_manager_mock_, Pull(argument_id));
EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id));
for (int i = 0; i < num_dependent_tasks; i++) {
TaskID task_id = TaskID::from_random();
dependent_tasks.push_back(task_id);
@@ -150,6 +192,7 @@ TEST_F(TaskDependencyManagerTest, TestMultipleTasks) {
// Tell the task dependency manager that the object is local.
EXPECT_CALL(object_manager_mock_, Cancel(argument_id));
EXPECT_CALL(reconstruction_policy_mock_, Cancel(argument_id));
auto ready_task_ids = task_dependency_manager_.HandleObjectLocal(argument_id);
// Check that all tasks are now ready to run.
ASSERT_EQ(ready_task_ids.size(), dependent_tasks.size());
@@ -169,7 +212,9 @@ TEST_F(TaskDependencyManagerTest, TestTaskChain) {
// No objects should be remote or canceled since each task depends on a
// locally queued task.
EXPECT_CALL(object_manager_mock_, Pull(_)).Times(0);
EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(_)).Times(0);
EXPECT_CALL(object_manager_mock_, Cancel(_)).Times(0);
EXPECT_CALL(reconstruction_policy_mock_, Cancel(_)).Times(0);
for (const auto &task : tasks) {
// Subscribe to each of the tasks' arguments.
auto arguments = task.GetDependencies();
@@ -183,7 +228,9 @@ TEST_F(TaskDependencyManagerTest, TestTaskChain) {
ASSERT_FALSE(ready);
}
// Mark each task as pending.
// Mark each task as pending. A lease entry should be added to the GCS for
// each task.
EXPECT_CALL(gcs_mock_, Add(_, task.GetTaskSpecification().TaskId(), _, _));
task_dependency_manager_.TaskPending(task);
i++;
@@ -224,6 +271,7 @@ TEST_F(TaskDependencyManagerTest, TestDependentPut) {
// No objects have been registered in the task dependency manager, so the put
// object should be remote.
EXPECT_CALL(object_manager_mock_, Pull(put_id));
EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(put_id));
// Subscribe to the task's dependencies.
bool ready = task_dependency_manager_.SubscribeDependencies(
task2.GetTaskSpecification().TaskId(), {put_id});
@@ -232,6 +280,8 @@ TEST_F(TaskDependencyManagerTest, TestDependentPut) {
// The put object should be considered local as soon as the task that creates
// it is pending execution.
EXPECT_CALL(object_manager_mock_, Cancel(put_id));
EXPECT_CALL(reconstruction_policy_mock_, Cancel(put_id));
EXPECT_CALL(gcs_mock_, Add(_, task1.GetTaskSpecification().TaskId(), _, _));
task_dependency_manager_.TaskPending(task1);
}
@@ -244,6 +294,7 @@ TEST_F(TaskDependencyManagerTest, TestTaskForwarding) {
auto arguments = task.GetDependencies();
static_cast<void>(task_dependency_manager_.SubscribeDependencies(
task.GetTaskSpecification().TaskId(), arguments));
EXPECT_CALL(gcs_mock_, Add(_, task.GetTaskSpecification().TaskId(), _, _));
task_dependency_manager_.TaskPending(task);
}
@@ -256,11 +307,13 @@ TEST_F(TaskDependencyManagerTest, TestTaskForwarding) {
// The object returned by the first task should be considered remote once we
// cancel the forwarded task, since the second task depends on it.
EXPECT_CALL(object_manager_mock_, Pull(return_id));
EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(return_id));
task_dependency_manager_.TaskCanceled(task_id);
// Simulate the task executing on a remote node and its return value
// appearing locally.
EXPECT_CALL(object_manager_mock_, Cancel(return_id));
EXPECT_CALL(reconstruction_policy_mock_, Cancel(return_id));
auto ready_tasks = task_dependency_manager_.HandleObjectLocal(return_id);
// Check that the task that we kept is now ready to run.
ASSERT_EQ(ready_tasks.size(), 1);
@@ -279,6 +332,7 @@ TEST_F(TaskDependencyManagerTest, TestEviction) {
// arguments should be remote.
for (const auto &argument_id : arguments) {
EXPECT_CALL(object_manager_mock_, Pull(argument_id));
EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id));
}
// Subscribe to the task's dependencies.
bool ready = task_dependency_manager_.SubscribeDependencies(task_id, arguments);
@@ -288,6 +342,7 @@ TEST_F(TaskDependencyManagerTest, TestEviction) {
// available.
for (const auto &argument_id : arguments) {
EXPECT_CALL(object_manager_mock_, Cancel(argument_id));
EXPECT_CALL(reconstruction_policy_mock_, Cancel(argument_id));
}
for (size_t i = 0; i < arguments.size(); i++) {
std::vector<TaskID> ready_tasks;
@@ -304,6 +359,7 @@ TEST_F(TaskDependencyManagerTest, TestEviction) {
// considered remote.
for (const auto &argument_id : arguments) {
EXPECT_CALL(object_manager_mock_, Pull(argument_id));
EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id));
}
for (size_t i = 0; i < arguments.size(); i++) {
std::vector<TaskID> waiting_tasks;
@@ -324,6 +380,7 @@ TEST_F(TaskDependencyManagerTest, TestEviction) {
// again.
for (const auto &argument_id : arguments) {
EXPECT_CALL(object_manager_mock_, Cancel(argument_id));
EXPECT_CALL(reconstruction_policy_mock_, Cancel(argument_id));
}
for (size_t i = 0; i < arguments.size(); i++) {
std::vector<TaskID> ready_tasks;
@@ -337,6 +394,26 @@ TEST_F(TaskDependencyManagerTest, TestEviction) {
}
}
TEST_F(TaskDependencyManagerTest, TestTaskLeaseRenewal) {
// Mark a task as pending.
auto task = ExampleTask({}, 0);
// We expect an initial call to acquire the lease.
EXPECT_CALL(gcs_mock_, Add(_, task.GetTaskSpecification().TaskId(), _, _));
task_dependency_manager_.TaskPending(task);
// Check that while the task is still pending, there is one call to renew the
// lease for each lease period that passes. The lease period doubles with
// each renewal.
int num_expected_calls = 4;
int64_t sleep_time = 0;
for (int i = 1; i <= num_expected_calls; i++) {
sleep_time += i * initial_lease_period_ms_;
}
EXPECT_CALL(gcs_mock_, Add(_, task.GetTaskSpecification().TaskId(), _, _))
.Times(num_expected_calls);
Run(sleep_time);
}
} // namespace raylet
} // namespace ray
+13 -3
View File
@@ -3,17 +3,27 @@
#include <chrono>
/// Return the number of milliseconds since the Unix epoch.
/// Return the number of milliseconds since the steady clock epoch. NOTE: The
/// returned timestamp may be used for accurately measuring intervals but has
/// no relation to wall clock time. It must not be used for synchronization
/// across multiple nodes.
///
/// TODO(rkn): This function appears in multiple places. It should be
/// deduplicated.
///
/// \return The number of milliseconds since the Unix epoch.
int64_t current_time_ms() {
/// \return The number of milliseconds since the steady clock epoch.
inline int64_t current_time_ms() {
std::chrono::milliseconds ms_since_epoch =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch());
return ms_since_epoch.count();
}
inline int64_t current_sys_time_ms() {
std::chrono::milliseconds ms_since_epoch =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch());
return ms_since_epoch.count();
}
#endif // RAY_UTIL_UTIL_H