diff --git a/.travis.yml b/.travis.yml index b583b9d75..cd99c4dda 100644 --- a/.travis.yml +++ b/.travis.yml @@ -179,6 +179,7 @@ install: - ./src/ray/raylet/worker_pool_test - ./src/ray/raylet/lineage_cache_test - ./src/ray/raylet/task_dependency_manager_test + - ./src/ray/raylet/reconstruction_policy_test - bash ../src/common/test/run_tests.sh - bash ../src/plasma/test/run_tests.sh diff --git a/src/common/state/ray_config.h b/src/common/state/ray_config.h index df2f12554..4096bdae5 100644 --- a/src/common/state/ray_config.h +++ b/src/common/state/ray_config.h @@ -18,6 +18,10 @@ class RayConfig { int64_t num_heartbeats_timeout() const { return num_heartbeats_timeout_; } + int64_t initial_reconstruction_timeout_milliseconds() const { + return initial_reconstruction_timeout_milliseconds_; + } + int64_t get_timeout_milliseconds() const { return get_timeout_milliseconds_; } uint64_t max_lineage_size() const { return max_lineage_size_; } @@ -111,6 +115,7 @@ class RayConfig { : ray_protocol_version_(0x0000000000000000), heartbeat_timeout_milliseconds_(100), num_heartbeats_timeout_(100), + initial_reconstruction_timeout_milliseconds_(200), get_timeout_milliseconds_(1000), worker_get_request_size_(10000), worker_fetch_request_size_(10000), @@ -157,6 +162,12 @@ class RayConfig { /// it as dead to the db_client table. int64_t num_heartbeats_timeout_; + /// The initial period for a task execution lease. The lease will expire this + /// many milliseconds after the first acquisition of the lease. Nodes that + /// require an object will not try to reconstruct the task until at least + /// this many milliseconds. + int64_t initial_reconstruction_timeout_milliseconds_; + /// These are used by the worker to set timeouts and to batch requests when /// getting objects. int64_t get_timeout_milliseconds_; diff --git a/src/ray/gcs/client.cc b/src/ray/gcs/client.cc index ecae1f1a0..d7041d328 100644 --- a/src/ray/gcs/client.cc +++ b/src/ray/gcs/client.cc @@ -15,6 +15,7 @@ AsyncGcsClient::AsyncGcsClient(const ClientID &client_id, CommandType command_ty task_table_.reset(new TaskTable(context_, this, command_type)); raylet_task_table_.reset(new raylet::TaskTable(context_, this, command_type)); task_reconstruction_log_.reset(new TaskReconstructionLog(context_, this)); + task_lease_table_.reset(new TaskLeaseTable(context_, this)); heartbeat_table_.reset(new HeartbeatTable(context_, this)); error_table_.reset(new ErrorTable(primary_context_, this)); profile_table_.reset(new ProfileTable(context_, this)); @@ -75,6 +76,8 @@ TaskReconstructionLog &AsyncGcsClient::task_reconstruction_log() { return *task_reconstruction_log_; } +TaskLeaseTable &AsyncGcsClient::task_lease_table() { return *task_lease_table_; } + ClientTable &AsyncGcsClient::client_table() { return *client_table_; } FunctionTable &AsyncGcsClient::function_table() { return *function_table_; } diff --git a/src/ray/gcs/client.h b/src/ray/gcs/client.h index 852db93c5..8a72dbed9 100644 --- a/src/ray/gcs/client.h +++ b/src/ray/gcs/client.h @@ -56,6 +56,7 @@ class RAY_EXPORT AsyncGcsClient { raylet::TaskTable &raylet_task_table(); ActorTable &actor_table(); TaskReconstructionLog &task_reconstruction_log(); + TaskLeaseTable &task_lease_table(); ClientTable &client_table(); HeartbeatTable &heartbeat_table(); ErrorTable &error_table(); @@ -80,6 +81,7 @@ class RAY_EXPORT AsyncGcsClient { std::unique_ptr raylet_task_table_; std::unique_ptr actor_table_; std::unique_ptr task_reconstruction_log_; + std::unique_ptr task_lease_table_; std::unique_ptr heartbeat_table_; std::unique_ptr error_table_; std::unique_ptr profile_table_; diff --git a/src/ray/gcs/format/gcs.fbs b/src/ray/gcs/format/gcs.fbs index 115a03349..72b2a62bd 100644 --- a/src/ray/gcs/format/gcs.fbs +++ b/src/ray/gcs/format/gcs.fbs @@ -16,6 +16,7 @@ enum TablePrefix:int { HEARTBEAT, ERROR_INFO, PROFILE, + TASK_LEASE, } // The channel that Add operations to the Table should be published on, if any. @@ -28,6 +29,7 @@ enum TablePubsub:int { ACTOR, HEARTBEAT, ERROR_INFO, + TASK_LEASE, } table GcsTableEntry { @@ -188,3 +190,15 @@ table HeartbeatTableData { resources_total_label: [string]; resources_total_capacity: [double]; } + +// Data for a lease on task execution. +table TaskLeaseData { + // Node manager client ID. + node_manager_id: string; + // The time that the lease was last acquired at. NOTE(swang): This is the + // system clock time according to the node that added the entry and is not + // synchronized with other nodes. + acquired_at: long; + // The period that the lease is active for. + timeout: long; +} diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index c570f6930..54f30aa28 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -321,10 +321,12 @@ void ClientTable::HandleNotification(AsyncGcsClient *client, if (client_added_callback_ != nullptr) { client_added_callback_(client, client_id, data); } + RAY_CHECK(removed_clients_.find(client_id) == removed_clients_.end()); } else { if (client_removed_callback_ != nullptr) { client_removed_callback_(client, client_id, data); } + removed_clients_.insert(client_id); } } } @@ -335,9 +337,13 @@ void ClientTable::HandleConnected(AsyncGcsClient *client, const ClientTableDataT << client_id_; } -const ClientID &ClientTable::GetLocalClientId() { return client_id_; } +const ClientID &ClientTable::GetLocalClientId() const { return client_id_; } -const ClientTableDataT &ClientTable::GetLocalClient() { return local_client_; } +const ClientTableDataT &ClientTable::GetLocalClient() const { return local_client_; } + +bool ClientTable::IsRemoved(const ClientID &client_id) const { + return removed_clients_.count(client_id) == 1; +} Status ClientTable::Connect(const ClientTableDataT &local_client) { RAY_CHECK(!disconnected_) << "Tried to reconnect a disconnected client."; @@ -397,7 +403,7 @@ ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) { return Append(JobID::nil(), client_log_key_, data, nullptr); } -const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) { +const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) const { RAY_CHECK(!client_id.is_nil()); auto entry = client_cache_.find(client_id); if (entry != client_cache_.end()) { @@ -405,7 +411,7 @@ const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) { } else { // If the requested client was not found, return a reference to the nil // client entry. - return client_cache_[ClientID::nil()]; + return client_cache_.at(ClientID::nil()); } } @@ -415,6 +421,7 @@ template class Table; template class Table; template class Log; template class Log; +template class Table; template class Table; template class Log; template class Log; diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index fc2d9a194..50424d661 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "ray/constants.h" #include "ray/id.h" @@ -346,6 +347,15 @@ class TaskReconstructionLog : public Log { } }; +class TaskLeaseTable : public Table { + public: + TaskLeaseTable(const std::shared_ptr &context, AsyncGcsClient *client) + : Table(context, client) { + pubsub_channel_ = TablePubsub::TASK_LEASE; + prefix_ = TablePrefix::TASK_LEASE; + } +}; + namespace raylet { class TaskTable : public Table { @@ -578,17 +588,23 @@ class ClientTable : private Log { /// \param client The client to get information about. /// \return A reference to the requested client. If the client is not in the /// cache, then an entry with a nil ClientID will be returned. - const ClientTableDataT &GetClient(const ClientID &client); + const ClientTableDataT &GetClient(const ClientID &client) const; /// Get the local client's ID. /// /// \return The local client's ID. - const ClientID &GetLocalClientId(); + const ClientID &GetLocalClientId() const; /// Get the local client's information. /// /// \return The local client's information. - const ClientTableDataT &GetLocalClient(); + const ClientTableDataT &GetLocalClient() const; + + /// Check whether the given client is removed. + /// + /// \param client_id The ID of the client to check. + /// \return Whether the client with ID client_id is removed. + bool IsRemoved(const ClientID &client_id) const; private: /// Handle a client table notification. @@ -612,6 +628,8 @@ class ClientTable : private Log { ClientTableCallback client_removed_callback_; /// A cache for information about all clients. std::unordered_map client_cache_; + /// The set of removed clients. + std::unordered_set removed_clients_; }; } // namespace gcs diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 82ddfcb2e..b1367b88a 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -2,13 +2,15 @@ namespace ray { -ObjectDirectory::ObjectDirectory(std::shared_ptr &gcs_client) { - gcs_client_ = gcs_client; -} +ObjectDirectory::ObjectDirectory(std::shared_ptr &gcs_client) + : gcs_client_(gcs_client) {} + +namespace { std::vector UpdateObjectLocations( std::unordered_set &client_ids, - const std::vector &location_history) { + const std::vector &location_history, + const ray::gcs::ClientTable &client_table) { // location_history contains the history of locations of the object (it is a log), // which might look like the following: // client1.is_eviction = false @@ -24,9 +26,19 @@ std::vector UpdateObjectLocations( client_ids.erase(client_id); } } + // Filter out the removed clients from the object locations. + for (auto it = client_ids.begin(); it != client_ids.end();) { + if (client_table.IsRemoved(*it)) { + it = client_ids.erase(it); + } else { + it++; + } + } return std::vector(client_ids.begin(), client_ids.end()); } +} // namespace + void ObjectDirectory::RegisterBackend() { auto object_notification_callback = [this]( gcs::AsyncGcsClient *client, const ObjectID &object_id, @@ -38,8 +50,9 @@ void ObjectDirectory::RegisterBackend() { return; } // Update entries for this object. - std::vector client_id_vec = UpdateObjectLocations( - object_id_listener_pair->second.current_object_locations, location_history); + std::vector client_id_vec = + UpdateObjectLocations(object_id_listener_pair->second.current_object_locations, + location_history, gcs_client_->client_table()); if (!client_id_vec.empty()) { // Copy the callbacks so that the callbacks can unsubscribe without interrupting // looping over the callbacks. @@ -148,12 +161,12 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, JobID job_id = JobID::nil(); ray::Status status = gcs_client_->object_table().Lookup( job_id, object_id, - [callback](gcs::AsyncGcsClient *client, const ObjectID &object_id, - const std::vector &location_history) { + [this, callback](gcs::AsyncGcsClient *client, const ObjectID &object_id, + const std::vector &location_history) { // Build the set of current locations based on the entries in the log. std::unordered_set client_ids; - std::vector locations_vector = - UpdateObjectLocations(client_ids, location_history); + std::vector locations_vector = UpdateObjectLocations( + client_ids, location_history, gcs_client_->client_table()); callback(locations_vector, object_id); }); return status; diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index 1cf4323e8..af48cc182 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -141,10 +141,10 @@ class ObjectDirectory : public ObjectDirectoryInterface { std::unordered_set current_object_locations; }; - /// Info about subscribers to object locations. - std::unordered_map listeners_; /// Reference to the gcs client. std::shared_ptr gcs_client_; + /// Info about subscribers to object locations. + std::unordered_map listeners_; /// Map from object ID to the number of times it's been evicted on this /// node before. std::unordered_map object_evictions_; diff --git a/src/ray/raylet/CMakeLists.txt b/src/ray/raylet/CMakeLists.txt index 735ed4174..07b3525ee 100644 --- a/src/ray/raylet/CMakeLists.txt +++ b/src/ray/raylet/CMakeLists.txt @@ -37,6 +37,7 @@ ADD_RAY_TEST(worker_pool_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} $ ADD_RAY_TEST(task_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY}) ADD_RAY_TEST(lineage_cache_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY}) ADD_RAY_TEST(task_dependency_manager_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY}) +ADD_RAY_TEST(reconstruction_policy_test STATIC_LINK_LIBS ray_static gtest gtest_main gmock_main pthread ${Boost_SYSTEM_LIBRARY}) include_directories(${GCS_FBS_OUTPUT_DIRECTORY}) add_library(rayletlib raylet.cc ${NODE_MANAGER_FBS_OUTPUT_FILES}) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index bdb6c389e..1072e5a6f 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -5,7 +5,6 @@ // gen_local_scheduler_fbs from src/ray/CMakeLists.txt. #include "local_scheduler/format/local_scheduler_generated.h" #include "ray/raylet/format/node_manager_generated.h" -#include "ray/util/util.h" namespace { @@ -91,8 +90,16 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, config.worker_command), local_queues_(SchedulingQueue()), scheduling_policy_(local_queues_), - reconstruction_policy_([this](const TaskID &task_id) { ResubmitTask(task_id); }), - task_dependency_manager_(object_manager), + reconstruction_policy_( + io_service_, [this](const TaskID &task_id) { ResubmitTask(task_id); }, + RayConfig::instance().initial_reconstruction_timeout_milliseconds(), + gcs_client_->client_table().GetLocalClientId(), gcs_client->task_lease_table(), + std::make_shared(gcs_client)), + task_dependency_manager_( + object_manager, reconstruction_policy_, io_service, + gcs_client_->client_table().GetLocalClientId(), + RayConfig::instance().initial_reconstruction_timeout_milliseconds(), + gcs_client->task_lease_table()), lineage_cache_(gcs_client_->client_table().GetLocalClientId(), gcs_client->raylet_task_table(), gcs_client->raylet_task_table(), config.max_lineage_size), @@ -130,6 +137,30 @@ ray::Status NodeManager::RegisterGcs() { JobID::nil(), gcs_client_->client_table().GetLocalClientId(), task_committed_callback, nullptr, nullptr)); + const auto task_lease_notification_callback = [this](gcs::AsyncGcsClient *client, + const TaskID &task_id, + const TaskLeaseDataT &task_lease) { + const ClientID node_manager_id = ClientID::from_binary(task_lease.node_manager_id); + if (gcs_client_->client_table().IsRemoved(node_manager_id)) { + // The node manager that added the task lease is already removed. The + // lease is considered inactive. + reconstruction_policy_.HandleTaskLeaseNotification(task_id, 0); + } else { + // NOTE(swang): The task_lease.timeout is an overestimate of the lease's + // expiration period since the entry may have been in the GCS for some + // time already. For a more accurate estimate, the age of the entry in + // the GCS should be subtracted from task_lease.timeout. + reconstruction_policy_.HandleTaskLeaseNotification(task_id, task_lease.timeout); + } + }; + const auto task_lease_empty_callback = [this](gcs::AsyncGcsClient *client, + const TaskID &task_id) { + reconstruction_policy_.HandleTaskLeaseNotification(task_id, 0); + }; + RAY_RETURN_NOT_OK(gcs_client_->task_lease_table().Subscribe( + JobID::nil(), gcs_client_->client_table().GetLocalClientId(), + task_lease_notification_callback, task_lease_empty_callback, nullptr)); + // Register a callback for actor creation notifications. auto actor_creation_callback = [this]( gcs::AsyncGcsClient *client, const ActorID &actor_id, @@ -212,14 +243,6 @@ void NodeManager::Heartbeat() { void NodeManager::ClientAdded(const ClientTableDataT &client_data) { ClientID client_id = ClientID::from_binary(client_data.client_id); - // Make sure the client hasn't already been removed. - if (removed_clients_.find(client_id) != removed_clients_.end()) { - // This client has already been removed, so don't do anything. - RAY_LOG(INFO) << "The client " << client_id << " has already been removed, so it " - << "can't be added. This is very unusual."; - return; - } - RAY_LOG(DEBUG) << "[ClientAdded] received callback from client id " << client_id; if (client_id == gcs_client_->client_table().GetLocalClientId()) { // We got a notification for ourselves, so we are connected to the GCS now. @@ -258,17 +281,11 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) { } void NodeManager::ClientRemoved(const ClientTableDataT &client_data) { + // TODO(swang): If we receive a notification for our own death, clean up and + // exit immediately. const ClientID client_id = ClientID::from_binary(client_data.client_id); RAY_LOG(DEBUG) << "[ClientRemoved] received callback from client id " << client_id; - // If the client has already been removed, don't do anything. - if (removed_clients_.find(client_id) != removed_clients_.end()) { - RAY_LOG(INFO) << "The client " << client_id << " has already been removed. This " - << "should be very unusual."; - return; - } - removed_clients_.insert(client_id); - RAY_CHECK(client_id != gcs_client_->client_table().GetLocalClientId()) << "Exiting because this node manager has mistakenly been marked dead by the " << "monitor."; @@ -767,10 +784,6 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag bool forwarded) { // Add the task and its uncommitted lineage to the lineage cache. lineage_cache_.AddWaitingTask(task, uncommitted_lineage); - // Mark the task as pending. Once the task has finished execution, or once it - // has been forwarded to another node, the task must be marked as canceled in - // the TaskDependencyManager. - task_dependency_manager_.TaskPending(task); const TaskSpecification &spec = task.GetTaskSpecification(); if (spec.IsActorTask()) { @@ -791,7 +804,7 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag } else { // The actor is remote. Forward the task to the node manager that owns // the actor. - if (removed_clients_.find(node_manager_id) != removed_clients_.end()) { + if (gcs_client_->client_table().IsRemoved(node_manager_id)) { // The remote node manager is dead, so handle the fact that this actor // is also dead. TreatTaskAsFailed(spec); @@ -824,6 +837,10 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag // Keep the task queued until we discover the actor's location. // (See design_docs/task_states.rst for the state transition diagram.) local_queues_.QueueMethodsWaitingForActorCreation({task}); + // Mark the task as pending. It will be canceled once we discover the + // actor's location and either execute the task ourselves or forward it + // to another node. + task_dependency_manager_.TaskPending(task); } } else { // This is a non-actor task. Queue the task for a placement decision or for dispatch @@ -914,20 +931,11 @@ void NodeManager::HandleWorkerUnblocked(std::shared_ptr worker) { worker->MarkUnblocked(); } -void NodeManager::HandleRemoteDependencyRequired(const ObjectID &dependency_id) { - // Try to fetch the object from the object manager. - RAY_CHECK_OK(object_manager_.Pull(dependency_id)); - // TODO(swang): Request reconstruction of the object, possibly after a - // timeout. -} - -void NodeManager::HandleRemoteDependencyCanceled(const ObjectID &dependency_id) { - // Cancel the fetch request from the object manager. - RAY_CHECK_OK(object_manager_.Cancel(dependency_id)); - // TODO(swang): Cancel reconstruction of the object. -} - void NodeManager::EnqueuePlaceableTask(const Task &task) { + // Mark the task as pending. Once the task has finished execution, or once it + // has been forwarded to another node, the task must be marked as canceled in + // the TaskDependencyManager. + task_dependency_manager_.TaskPending(task); // TODO(atumanov): add task lookup hashmap and change EnqueuePlaceableTask to take // a vector of TaskIDs. Trigger MoveTask internally. // Subscribe to the task's dependencies. @@ -1101,7 +1109,7 @@ void NodeManager::FinishAssignedTask(Worker &worker) { } void NodeManager::ResubmitTask(const TaskID &task_id) { - throw std::runtime_error("Method not implemented"); + RAY_LOG(WARNING) << "Task re-execution is not currently implemented"; } void NodeManager::HandleObjectLocal(const ObjectID &object_id) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index d8c180ba3..dd7d23a91 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -137,14 +137,6 @@ class NodeManager { /// \return Void. void CleanUpTasksForDeadActor(const ActorID &actor_id); - /// Methods for managing object dependencies. - /// Handle a dependency required by a queued task that is missing locally. - /// The dependency is (1) on a remote node, (2) pending creation on a remote - /// node, or (3) missing from all nodes and requires reconstruction. - void HandleRemoteDependencyRequired(const ObjectID &dependency_id); - /// Handle a dependency that was previously required by a queued task that is - /// no longer required. - void HandleRemoteDependencyCanceled(const ObjectID &dependency_id); /// Handle an object becoming local. This updates any local accounting, but /// does not write to any global accounting in the GCS. void HandleObjectLocal(const ObjectID &object_id); @@ -180,9 +172,6 @@ class NodeManager { /// The lineage cache for the GCS object and task tables. LineageCache lineage_cache_; std::vector remote_clients_; - /// A set of all of the remote clients that have been removed. In principle, - /// this could grow unbounded. - std::unordered_set removed_clients_; std::unordered_map remote_server_connections_; /// A mapping from actor ID to registration information about that actor /// (including which node manager owns it). diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index d22c8c966..e0e175b47 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -4,8 +4,169 @@ namespace ray { namespace raylet { -void ReconstructionPolicy::CheckObjectReconstruction(const ObjectID &object) { - throw std::runtime_error("Method not implemented"); +ReconstructionPolicy::ReconstructionPolicy( + boost::asio::io_service &io_service, + std::function reconstruction_handler, + int64_t initial_reconstruction_timeout_ms, const ClientID &client_id, + gcs::PubsubInterface &task_lease_pubsub, + std::shared_ptr object_directory) + : io_service_(io_service), + reconstruction_handler_(reconstruction_handler), + initial_reconstruction_timeout_ms_(initial_reconstruction_timeout_ms), + client_id_(client_id), + task_lease_pubsub_(task_lease_pubsub), + object_directory_(std::move(object_directory)) {} + +void ReconstructionPolicy::SetTaskTimeout( + std::unordered_map::iterator task_it, + int64_t timeout_ms) { + task_it->second.expires_at = current_time_ms() + timeout_ms; + auto timeout = boost::posix_time::milliseconds(timeout_ms); + task_it->second.reconstruction_timer->expires_from_now(timeout); + const TaskID task_id = task_it->first; + task_it->second.reconstruction_timer->async_wait( + [this, task_id](const boost::system::error_code &error) { + if (!error) { + auto it = listening_tasks_.find(task_id); + if (it == listening_tasks_.end()) { + return; + } + if (it->second.subscribed) { + // If the timer expired and we were subscribed to notifications, + // then this means that we did not receive a task lease + // notification within the lease period. Otherwise, the timer + // would have been reset when the most recent notification was + // received. The current lease is now considered expired. + HandleTaskLeaseExpired(task_id); + } else { + // This task is still required, so subscribe to task lease + // notifications. Reconstruction will be triggered if the current + // task lease expires, or if no one has acquired the task lease. + // NOTE(swang): When reconstruction for a task is first requested, + // we do not initially subscribe to task lease notifications, which + // requires at least one GCS operation. This is in case the objects + // required by the task are no longer needed soon after. If the + // task is still required after this initial period, then we now + // subscribe to task lease notifications. + RAY_CHECK_OK(task_lease_pubsub_.RequestNotifications(JobID::nil(), task_id, + client_id_)); + it->second.subscribed = true; + } + } else { + // Check that the error was due to the timer being canceled. + RAY_CHECK(error == boost::asio::error::operation_aborted); + } + }); +} + +void ReconstructionPolicy::AttemptReconstruction(const TaskID &task_id, + const ObjectID &required_object_id, + int reconstruction_attempt) { + // If we are no longer listening for objects created by this task, give up. + auto it = listening_tasks_.find(task_id); + if (it == listening_tasks_.end()) { + return; + } + + // If the object is no longer required, give up. + if (it->second.created_objects.count(required_object_id) == 0) { + return; + } + + // Suppress duplicate reconstructions of the same task. This can happen if, + // for example, a task creates two different objects that both require + // reconstruction. + if (reconstruction_attempt != it->second.reconstruction_attempt) { + // Through some other path, reconstruction was already attempted more than + // reconstruction_attempt many times. + return; + } + // Increment the number of times reconstruction has been attempted. This is + // used to suppress duplicate reconstructions of the same task. + it->second.reconstruction_attempt++; + + // Reset the timer to wait for task lease notifications again. NOTE(swang): + // The timer should already be set here, but we extend it to give some time + // for the reconstructed task to propagate notifications. + SetTaskTimeout(it, initial_reconstruction_timeout_ms_); + // TODO(swang): Suppress simultaneous attempts to reconstruct the task using + // the task reconstruction log. + reconstruction_handler_(task_id); +} + +void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) { + auto it = listening_tasks_.find(task_id); + RAY_CHECK(it != listening_tasks_.end()); + int reconstruction_attempt = it->second.reconstruction_attempt; + // Lookup the objects created by this task in the object directory. If any + // objects no longer exist on any live nodes, then reconstruction will be + // attempted asynchronously. + for (const auto &created_object_id : it->second.created_objects) { + RAY_CHECK_OK(object_directory_->LookupLocations( + created_object_id, + [this, task_id, reconstruction_attempt](const std::vector &clients, + const ray::ObjectID &object_id) { + if (clients.empty()) { + // The required object no longer exists on any live nodes. Attempt + // reconstruction. + AttemptReconstruction(task_id, object_id, reconstruction_attempt); + } + })); + } + // Reset the timer to wait for task lease notifications again. + SetTaskTimeout(it, initial_reconstruction_timeout_ms_); +} + +void ReconstructionPolicy::HandleTaskLeaseNotification(const TaskID &task_id, + int64_t lease_timeout_ms) { + auto it = listening_tasks_.find(task_id); + if (it == listening_tasks_.end()) { + // We are no longer listening for this task, so ignore the notification. + return; + } + + if (lease_timeout_ms == 0) { + HandleTaskLeaseExpired(task_id); + } else if ((current_time_ms() + lease_timeout_ms) > it->second.expires_at) { + // The current lease is longer than the timer's current expiration time. + // Reset the timer according to the current lease. + SetTaskTimeout(it, lease_timeout_ms); + } +} + +void ReconstructionPolicy::ListenAndMaybeReconstruct(const ObjectID &object_id) { + TaskID task_id = ComputeTaskId(object_id); + auto it = listening_tasks_.find(task_id); + // Add this object to the list of objects created by the same task. + if (it == listening_tasks_.end()) { + auto inserted = listening_tasks_.emplace(task_id, ReconstructionTask(io_service_)); + it = inserted.first; + // Set a timer for the task that created the object. If the lease for that + // task expires, then reconstruction of that task will be triggered. + SetTaskTimeout(it, initial_reconstruction_timeout_ms_); + } + it->second.created_objects.insert(object_id); +} + +void ReconstructionPolicy::Cancel(const ObjectID &object_id) { + TaskID task_id = ComputeTaskId(object_id); + auto it = listening_tasks_.find(task_id); + if (it == listening_tasks_.end()) { + // We already stopped listening for this task. + return; + } + + it->second.created_objects.erase(object_id); + // If there are no more needed objects created by this task, stop listening + // for notifications. + if (it->second.created_objects.empty()) { + listening_tasks_.erase(it); + // Cancel notifications for the task lease if we were subscribed to them. + if (it->second.subscribed) { + RAY_CHECK_OK( + task_lease_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_)); + } + } } } // namespace raylet diff --git a/src/ray/raylet/reconstruction_policy.h b/src/ray/raylet/reconstruction_policy.h index 4f8ca7069..cccdc7d24 100644 --- a/src/ray/raylet/reconstruction_policy.h +++ b/src/ray/raylet/reconstruction_policy.h @@ -2,33 +2,133 @@ #define RAY_RAYLET_RECONSTRUCTION_POLICY_H #include +#include +#include +#include + +#include "ray/gcs/tables.h" #include "ray/id.h" +#include "ray/util/util.h" + +#include "ray/object_manager/object_directory.h" namespace ray { namespace raylet { -// TODO(swang): Use std::function instead of boost. +class ReconstructionPolicyInterface { + public: + virtual void ListenAndMaybeReconstruct(const ObjectID &object_id) = 0; + virtual void Cancel(const ObjectID &object_id) = 0; + virtual ~ReconstructionPolicyInterface(){}; +}; -class ReconstructionPolicy { +class ReconstructionPolicy : public ReconstructionPolicyInterface { public: /// Create the reconstruction policy. /// + /// \param io_service The event loop to attach reconstruction timers to. /// \param reconstruction_handler The handler to call if a task needs to be /// re-executed. - // TODO(swang): This requires at minimum references to the Raylet's lineage - // cache and GCS client. - ReconstructionPolicy(std::function reconstruction_handler) {} + /// \param initial_reconstruction_timeout_ms The initial timeout within which + /// a task lease notification must be received. Otherwise, reconstruction + /// will be triggered. + /// \param client_id The client ID to use when requesting notifications from + /// the GCS. + /// \param task_lease_pubsub The GCS pub-sub storage system to request task + /// lease notifications from. + ReconstructionPolicy(boost::asio::io_service &io_service, + std::function reconstruction_handler, + int64_t initial_reconstruction_timeout_ms, + const ClientID &client_id, + gcs::PubsubInterface &task_lease_pubsub, + std::shared_ptr object_directory); - /// Check whether an object requires reconstruction. If this object requires - /// reconstruction, the registered task reconstruction handler will be called - /// for each task that needs to be re-executed. + /// Listen for task lease notifications about an object that may require + /// reconstruction. If no notifications are received within the initial + /// timeout, then the registered task reconstruction handler will be called + /// for the task that created the object. /// /// \param object_id The object to check for reconstruction. - void CheckObjectReconstruction(const ObjectID &object_id); + void ListenAndMaybeReconstruct(const ObjectID &object_id); + + /// Cancel listening for an object. Notifications for the object will be + /// ignored. This does not cancel a reconstruction attempt that is already in + /// progress. + /// + /// \param object_id The object to cancel. + void Cancel(const ObjectID &object_id); + + /// Handle a notification for a task lease. This handler should be called to + /// indicate that a task is currently being executed, so any objects that it + /// creates should not be reconstructed. + /// + /// \param task_id The task ID of the task being executed. + /// \param lease_timeout_ms After this timeout, the task's lease is + /// guaranteed to be expired. If a second notification is not received within + /// this timeout, then objects that the task creates may be reconstructed. + void HandleTaskLeaseNotification(const TaskID &task_id, int64_t lease_timeout_ms); private: + struct ReconstructionTask { + ReconstructionTask(boost::asio::io_service &io_service) + : expires_at(INT64_MAX), + subscribed(false), + reconstruction_attempt(0), + reconstruction_timer(new boost::asio::deadline_timer(io_service)) {} + + // The objects created by this task that we are listening for notifications for. + std::unordered_set created_objects; + // The time at which the timer for this task expires, according to this + // node's steady clock. + int64_t expires_at; + // Whether we are subscribed to lease notifications for this task. + bool subscribed; + // The number of times we've attempted reconstructing this task so far. + int reconstruction_attempt; + // The task's reconstruction timer. If this expires before a lease + // notification is received, then the task will be reconstructed. + std::unique_ptr reconstruction_timer; + }; + + /// Set the reconstruction timer for a task. If no task lease notifications + /// are received within the timeout, then reconstruction will be triggered. + /// If the timer was previously set, this method will cancel it and reset the + /// timer to the new timeout. + void SetTaskTimeout(std::unordered_map::iterator task_it, + int64_t timeout_ms); + + /// Attempt to re-execute a task to reconstruct the required object. + /// + /// \param task_id The task to attempt to re-execute. + /// \param required_object_id The object created by the task that requires + /// reconstruction. + /// \param reconstruction_attempt What number attempt this is at + /// reconstructing the task. This is used to suppress duplicate + /// reconstructions of the same task (e.g., if a task creates two objects + /// that both require reconstruction). + void AttemptReconstruction(const TaskID &task_id, const ObjectID &required_object_id, + int reconstruction_attempt); + + /// Handle expiration of a task lease. + void HandleTaskLeaseExpired(const TaskID &task_id); + + /// The event loop. + boost::asio::io_service &io_service_; + /// The handler to call for tasks that require reconstruction. + const std::function reconstruction_handler_; + /// The initial timeout within which a task lease notification must be + /// received. Otherwise, reconstruction will be triggered. + const int64_t initial_reconstruction_timeout_ms_; + /// The client ID to use when requesting notifications from the GCS. + const ClientID client_id_; + /// The GCS pub-sub storage system to request task lease notifications from. + gcs::PubsubInterface &task_lease_pubsub_; + /// The object directory used to lookup object locations. + std::shared_ptr object_directory_; + /// The tasks that we are currently subscribed to in the GCS. + std::unordered_map listening_tasks_; }; } // namespace raylet diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc new file mode 100644 index 000000000..828c6d4cb --- /dev/null +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -0,0 +1,330 @@ +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include + +#include "ray/raylet/format/node_manager_generated.h" +#include "ray/raylet/reconstruction_policy.h" + +#include "ray/object_manager/object_directory.h" + +namespace ray { + +namespace raylet { + +class MockObjectDirectory : public ObjectDirectoryInterface { + public: + MockObjectDirectory() {} + + ray::Status LookupLocations(const ObjectID &object_id, + const OnLocationsFound &callback) { + callbacks_.push_back({object_id, callback}); + return ray::Status::OK(); + } + + void FlushCallbacks() { + for (const auto &callback : callbacks_) { + const ObjectID object_id = callback.first; + callback.second(locations_[object_id], object_id); + } + callbacks_.clear(); + } + + void SetObjectLocations(const ObjectID &object_id, std::vector locations) { + locations_[object_id] = locations; + } + + MOCK_METHOD0(RegisterBackend, void(void)); + MOCK_METHOD3(GetInformation, ray::Status(const ClientID &, const InfoSuccessCallback &, + const InfoFailureCallback &)); + MOCK_METHOD3(SubscribeObjectLocations, + ray::Status(const ray::UniqueID &, const ObjectID &, + const OnLocationsFound &)); + MOCK_METHOD2(UnsubscribeObjectLocations, + ray::Status(const ray::UniqueID &, const ObjectID &)); + MOCK_METHOD3(ReportObjectAdded, + ray::Status(const ObjectID &, const ClientID &, const ObjectInfoT &)); + MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &)); + + private: + std::vector> callbacks_; + std::unordered_map> locations_; +}; + +class MockGcs : public gcs::PubsubInterface { + public: + MockGcs() : notification_callback_(nullptr), failure_callback_(nullptr){}; + + void Subscribe(const gcs::TaskLeaseTable::WriteCallback ¬ification_callback, + const gcs::TaskLeaseTable::FailureCallback &failure_callback) { + notification_callback_ = notification_callback; + failure_callback_ = failure_callback; + } + + void Add(const JobID &job_id, const TaskID &task_id, + std::shared_ptr &task_lease_data) { + task_lease_table_[task_id] = task_lease_data; + if (subscribed_tasks_.count(task_id) == 1) { + notification_callback_(nullptr, task_id, *task_lease_data); + } + } + + Status RequestNotifications(const JobID &job_id, const TaskID &task_id, + const ClientID &client_id) { + subscribed_tasks_.insert(task_id); + auto entry = task_lease_table_.find(task_id); + if (entry == task_lease_table_.end()) { + failure_callback_(nullptr, task_id); + } else { + notification_callback_(nullptr, task_id, *entry->second); + } + return ray::Status::OK(); + } + + Status CancelNotifications(const JobID &job_id, const TaskID &task_id, + const ClientID &client_id) { + subscribed_tasks_.erase(task_id); + return ray::Status::OK(); + } + + private: + gcs::TaskLeaseTable::WriteCallback notification_callback_; + gcs::TaskLeaseTable::FailureCallback failure_callback_; + std::unordered_map> task_lease_table_; + std::unordered_set subscribed_tasks_; +}; + +class ReconstructionPolicyTest : public ::testing::Test { + public: + ReconstructionPolicyTest() + : io_service_(), + mock_gcs_(), + mock_object_directory_(std::make_shared()), + reconstruction_timeout_ms_(50), + reconstruction_policy_(std::make_shared( + io_service_, + [this](const TaskID &task_id) { TriggerReconstruction(task_id); }, + reconstruction_timeout_ms_, ClientID::from_random(), mock_gcs_, + mock_object_directory_)), + timer_canceled_(false) { + mock_gcs_.Subscribe( + [this](gcs::AsyncGcsClient *client, const TaskID &task_id, + const TaskLeaseDataT &task_lease) { + reconstruction_policy_->HandleTaskLeaseNotification(task_id, + task_lease.timeout); + }, + [this](gcs::AsyncGcsClient *client, const TaskID &task_id) { + reconstruction_policy_->HandleTaskLeaseNotification(task_id, 0); + }); + } + + void TriggerReconstruction(const TaskID &task_id) { reconstructed_tasks_[task_id]++; } + + void Tick(const std::function &handler, + std::shared_ptr timer, + boost::posix_time::milliseconds timer_period, + const boost::system::error_code &error) { + if (timer_canceled_) { + return; + } + ASSERT_FALSE(error); + handler(); + // Fire the timer again after another period. + timer->expires_from_now(timer_period); + timer->async_wait( + [this, handler, timer, timer_period](const boost::system::error_code &error) { + Tick(handler, timer, timer_period, error); + }); + } + + void SetPeriodicTimer(uint64_t period_ms, const std::function &handler) { + timer_canceled_ = false; + auto timer_period = boost::posix_time::milliseconds(period_ms); + auto timer = std::make_shared(io_service_, timer_period); + timer->async_wait( + [this, handler, timer, timer_period](const boost::system::error_code &error) { + Tick(handler, timer, timer_period, error); + }); + } + + void CancelPeriodicTimer() { timer_canceled_ = true; } + + void Run(uint64_t reconstruction_timeout_ms) { + auto timer_period = boost::posix_time::milliseconds(reconstruction_timeout_ms); + auto timer = std::make_shared(io_service_, timer_period); + timer->async_wait([this, timer](const boost::system::error_code &error) { + ASSERT_FALSE(error); + io_service_.stop(); + }); + io_service_.run(); + io_service_.reset(); + + mock_object_directory_->FlushCallbacks(); + } + + protected: + boost::asio::io_service io_service_; + MockGcs mock_gcs_; + std::shared_ptr mock_object_directory_; + uint64_t reconstruction_timeout_ms_; + std::shared_ptr reconstruction_policy_; + bool timer_canceled_; + std::unordered_map reconstructed_tasks_; +}; + +TEST_F(ReconstructionPolicyTest, TestReconstructionSimple) { + TaskID task_id = TaskID::from_random(); + task_id = FinishTaskId(task_id); + ObjectID object_id = ComputeReturnId(task_id, 1); + + // Listen for an object. + reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + // Run the test for longer than the reconstruction timeout. + Run(reconstruction_timeout_ms_ * 1.1); + // Check that reconstruction was triggered for the task that created the + // object. + ASSERT_EQ(reconstructed_tasks_[task_id], 1); + + // Run the test again. + Run(reconstruction_timeout_ms_ * 1.1); + // Check that reconstruction was triggered again. + ASSERT_EQ(reconstructed_tasks_[task_id], 2); +} + +TEST_F(ReconstructionPolicyTest, TestReconstructionEvicted) { + TaskID task_id = TaskID::from_random(); + task_id = FinishTaskId(task_id); + ObjectID object_id = ComputeReturnId(task_id, 1); + mock_object_directory_->SetObjectLocations(object_id, {ClientID::from_random()}); + + // Listen for both objects. + reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + // Run the test for longer than the reconstruction timeout. + Run(reconstruction_timeout_ms_ * 1.1); + // Check that reconstruction was not triggered, since the objects still + // exist on a live node. + ASSERT_EQ(reconstructed_tasks_[task_id], 0); + + // Simulate evicting one of the objects. + mock_object_directory_->SetObjectLocations(object_id, {}); + // Run the test again. + Run(reconstruction_timeout_ms_ * 1.1); + // Check that reconstruction was triggered, since one of the objects was + // evicted. + ASSERT_EQ(reconstructed_tasks_[task_id], 1); +} + +TEST_F(ReconstructionPolicyTest, TestDuplicateReconstruction) { + // Create two object IDs produced by the same task. + TaskID task_id = TaskID::from_random(); + task_id = FinishTaskId(task_id); + ObjectID object_id1 = ComputeReturnId(task_id, 1); + ObjectID object_id2 = ComputeReturnId(task_id, 2); + + // Listen for both objects. + reconstruction_policy_->ListenAndMaybeReconstruct(object_id1); + reconstruction_policy_->ListenAndMaybeReconstruct(object_id2); + // Run the test for longer than the reconstruction timeout. + Run(reconstruction_timeout_ms_ * 1.1); + // Check that reconstruction is only triggered once for the task that created + // both objects. + ASSERT_EQ(reconstructed_tasks_[task_id], 1); + + // Run the test again. + Run(reconstruction_timeout_ms_ * 1.1); + // Check that reconstruction is again only triggered once. + ASSERT_EQ(reconstructed_tasks_[task_id], 2); +} + +TEST_F(ReconstructionPolicyTest, TestReconstructionSuppressed) { + TaskID task_id = TaskID::from_random(); + task_id = FinishTaskId(task_id); + ObjectID object_id = ComputeReturnId(task_id, 1); + // Run the test for much longer than the reconstruction timeout. + int64_t test_period = 2 * reconstruction_timeout_ms_; + + // Acquire the task lease for a period longer than the test period. + auto task_lease_data = std::make_shared(); + task_lease_data->node_manager_id = ClientID::from_random().hex(); + task_lease_data->acquired_at = current_sys_time_ms(); + task_lease_data->timeout = 2 * test_period; + mock_gcs_.Add(DriverID::nil(), task_id, task_lease_data); + + // Listen for an object. + reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + // Run the test. + Run(test_period); + // Check that reconstruction is suppressed by the active task lease. + ASSERT_TRUE(reconstructed_tasks_.empty()); + + // Run the test again past the expiration time of the lease. + Run(task_lease_data->timeout * 1.1); + // Check that this time, reconstruction is triggered. + ASSERT_EQ(reconstructed_tasks_[task_id], 1); +} + +TEST_F(ReconstructionPolicyTest, TestReconstructionContinuallySuppressed) { + TaskID task_id = TaskID::from_random(); + task_id = FinishTaskId(task_id); + ObjectID object_id = ComputeReturnId(task_id, 1); + + // Listen for an object. + reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + // Send the reconstruction manager heartbeats about the object. + SetPeriodicTimer(reconstruction_timeout_ms_ / 2, [this, task_id]() { + auto task_lease_data = std::make_shared(); + task_lease_data->node_manager_id = ClientID::from_random().hex(); + task_lease_data->acquired_at = current_sys_time_ms(); + task_lease_data->timeout = reconstruction_timeout_ms_; + mock_gcs_.Add(DriverID::nil(), task_id, task_lease_data); + }); + // Run the test for much longer than the reconstruction timeout. + Run(reconstruction_timeout_ms_ * 2); + // Check that reconstruction is suppressed. + ASSERT_TRUE(reconstructed_tasks_.empty()); + + // Cancel the heartbeats to the reconstruction manager. + CancelPeriodicTimer(); + // Run the test again. + Run(reconstruction_timeout_ms_ * 1.1); + // Check that this time, reconstruction is triggered. + ASSERT_EQ(reconstructed_tasks_[task_id], 1); +} + +TEST_F(ReconstructionPolicyTest, TestReconstructionCanceled) { + TaskID task_id = TaskID::from_random(); + task_id = FinishTaskId(task_id); + ObjectID object_id = ComputeReturnId(task_id, 1); + + // Listen for an object. + reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + // Halfway through the reconstruction timeout, cancel the object + // reconstruction. + auto timer_period = boost::posix_time::milliseconds(reconstruction_timeout_ms_); + auto timer = std::make_shared(io_service_, timer_period); + timer->async_wait([this, timer, object_id](const boost::system::error_code &error) { + ASSERT_FALSE(error); + reconstruction_policy_->Cancel(object_id); + }); + Run(reconstruction_timeout_ms_ * 2); + // Check that reconstruction is suppressed. + ASSERT_TRUE(reconstructed_tasks_.empty()); + + // Listen for the object again. + reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + // Run the test again. + Run(reconstruction_timeout_ms_ * 1.1); + // Check that this time, reconstruction is triggered. + ASSERT_EQ(reconstructed_tasks_[task_id], 1); +} + +} // namespace raylet + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index 7874f2313..0adc4e997 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -4,8 +4,18 @@ namespace ray { namespace raylet { -TaskDependencyManager::TaskDependencyManager(ObjectManagerInterface &object_manager) - : object_manager_(object_manager) {} +TaskDependencyManager::TaskDependencyManager( + ObjectManagerInterface &object_manager, + ReconstructionPolicyInterface &reconstruction_policy, + boost::asio::io_service &io_service, const ClientID &client_id, + int64_t initial_lease_period_ms, + gcs::TableInterface &task_lease_table) + : object_manager_(object_manager), + reconstruction_policy_(reconstruction_policy), + io_service_(io_service), + client_id_(client_id), + initial_lease_period_ms_(initial_lease_period_ms), + task_lease_table_(task_lease_table) {} bool TaskDependencyManager::CheckObjectLocal(const ObjectID &object_id) const { return local_objects_.count(object_id) == 1; @@ -44,6 +54,7 @@ void TaskDependencyManager::HandleRemoteDependencyRequired(const ObjectID &objec // If we haven't already, request the object manager to pull it from a // remote node. RAY_CHECK_OK(object_manager_.Pull(object_id)); + reconstruction_policy_.ListenAndMaybeReconstruct(object_id); } } } @@ -55,6 +66,7 @@ void TaskDependencyManager::HandleRemoteDependencyCanceled(const ObjectID &objec auto it = required_objects_.find(object_id); if (it != required_objects_.end()) { RAY_CHECK_OK(object_manager_.Cancel(object_id)); + reconstruction_policy_.Cancel(object_id); required_objects_.erase(it); } } @@ -196,23 +208,71 @@ void TaskDependencyManager::TaskPending(const Task &task) { TaskID task_id = task.GetTaskSpecification().TaskId(); // Record that the task is pending execution. - pending_tasks_.insert(task_id); - // Find any subscribed tasks that are dependent on objects created by the - // pending task. - auto remote_task_entry = required_tasks_.find(task_id); - if (remote_task_entry != required_tasks_.end()) { - for (const auto &object_entry : remote_task_entry->second) { - // This object created by the pending task will appear locally once the - // task completes execution. Cancel any in-progress operations to make - // the object local. - HandleRemoteDependencyCanceled(object_entry.first); + auto inserted = + pending_tasks_.emplace(task_id, PendingTask(initial_lease_period_ms_, io_service_)); + if (inserted.second) { + // This is the first time we've heard that this task is pending. Find any + // subscribed tasks that are dependent on objects created by the pending + // task. + auto remote_task_entry = required_tasks_.find(task_id); + if (remote_task_entry != required_tasks_.end()) { + for (const auto &object_entry : remote_task_entry->second) { + // This object created by the pending task will appear locally once the + // task completes execution. Cancel any in-progress operations to make + // the object local. + HandleRemoteDependencyCanceled(object_entry.first); + } } + + // Acquire the lease for the task's execution in the global lease table. + AcquireTaskLease(task_id); } } +void TaskDependencyManager::AcquireTaskLease(const TaskID &task_id) { + auto it = pending_tasks_.find(task_id); + int64_t now_ms = current_time_ms(); + if (it == pending_tasks_.end()) { + return; + } + + // Check that we were able to renew the task lease before the previous one + // expired. + if (now_ms > it->second.expires_at) { + RAY_LOG(WARNING) << "Task lease to renew has already expired by " + << (it->second.expires_at - now_ms) << "ms"; + } + + auto task_lease_data = std::make_shared(); + task_lease_data->node_manager_id = client_id_.hex(); + task_lease_data->acquired_at = current_sys_time_ms(); + task_lease_data->timeout = it->second.lease_period; + RAY_CHECK_OK(task_lease_table_.Add(DriverID::nil(), task_id, task_lease_data, nullptr)); + + auto period = boost::posix_time::milliseconds(it->second.lease_period / 2); + it->second.lease_timer->expires_from_now(period); + it->second.lease_timer->async_wait( + [this, task_id](const boost::system::error_code &error) { + if (!error) { + AcquireTaskLease(task_id); + } else { + // Check that the error was due to the timer being canceled. + RAY_CHECK(error == boost::asio::error::operation_aborted); + } + }); + + it->second.expires_at = now_ms + it->second.lease_period; + it->second.lease_period *= 2; +} + void TaskDependencyManager::TaskCanceled(const TaskID &task_id) { // Record that the task is no longer pending execution. - pending_tasks_.erase(task_id); + auto it = pending_tasks_.find(task_id); + if (it == pending_tasks_.end()) { + return; + } + pending_tasks_.erase(it); + // Find any subscribed tasks that are dependent on objects created by the // canceled task. auto remote_task_entry = required_tasks_.find(task_id); diff --git a/src/ray/raylet/task_dependency_manager.h b/src/ray/raylet/task_dependency_manager.h index 1ed7173ef..46caca6cc 100644 --- a/src/ray/raylet/task_dependency_manager.h +++ b/src/ray/raylet/task_dependency_manager.h @@ -6,6 +6,7 @@ #include "ray/raylet/task.h" #include "ray/object_manager/object_manager.h" #include "ray/raylet/reconstruction_policy.h" +#include "ray/util/util.h" // clang-format on namespace ray { @@ -27,7 +28,11 @@ class ReconstructionPolicy; class TaskDependencyManager { public: /// Create a task dependency manager. - TaskDependencyManager(ObjectManagerInterface &object_manager); + TaskDependencyManager(ObjectManagerInterface &object_manager, + ReconstructionPolicyInterface &reconstruction_policy, + boost::asio::io_service &io_service, const ClientID &client_id, + int64_t initial_lease_period_ms, + gcs::TableInterface &task_lease_table); /// Check whether an object is locally available. /// @@ -107,6 +112,21 @@ class TaskDependencyManager { int64_t num_missing_dependencies; }; + struct PendingTask { + PendingTask(int64_t initial_lease_period_ms, boost::asio::io_service &io_service) + : lease_period(initial_lease_period_ms), + expires_at(INT64_MAX), + lease_timer(new boost::asio::deadline_timer(io_service)) {} + + /// The timeout within which the lease should be renewed. + int64_t lease_period; + /// The time at which the current lease will expire, according to this + /// node's steady clock. + int64_t expires_at; + /// A timer used to determine when to next renew the lease. + std::unique_ptr lease_timer; + }; + /// Check whether the given object needs to be made available through object /// transfer or reconstruction. These are objects for which: (1) there is a /// subscribed task dependent on it, (2) the object is not local, and (3) the @@ -119,8 +139,29 @@ class TaskDependencyManager { /// operations to make the object available through object transfer or /// reconstruction. void HandleRemoteDependencyCanceled(const ObjectID &object_id); + /// Acquire the task lease in the GCS for the given task. This is used to + /// indicate to other nodes that the task is currently pending on this node. + /// The task lease has an expiration time. If we do not renew the lease + /// before that time, then other nodes may choose to execute the task. + void AcquireTaskLease(const TaskID &task_id); + /// The object manager, used to fetch required objects from remote nodes. ObjectManagerInterface &object_manager_; + /// The reconstruction policy, used to reconstruct required objects that no + /// longer exist on any live nodes. + ReconstructionPolicyInterface &reconstruction_policy_; + /// The event loop, used to set timers for renewing task leases. The task + /// leases are used to indicate which tasks are pending execution on this + /// node and must be periodically renewed. + boost::asio::io_service &io_service_; + /// This node's GCS client ID, used in the task lease information. + const ClientID client_id_; + /// For a given task, the expiration period of the initial task lease that is + /// added to the GCS. The lease expiration period is doubled every time the + /// lease is renewed. + const int64_t initial_lease_period_ms_; + /// The storage system for the task lease table. + gcs::TableInterface &task_lease_table_; /// A mapping from task ID of each subscribed task to its list of object /// dependencies. std::unordered_map task_dependencies_; @@ -137,7 +178,7 @@ class TaskDependencyManager { std::unordered_set local_objects_; /// The set of tasks that are pending execution. Any objects created by these /// tasks that are not already local are pending creation. - std::unordered_set pending_tasks_; + std::unordered_map pending_tasks_; }; } // namespace raylet diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index 936bfb749..32c339028 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -19,13 +19,50 @@ class MockObjectManager : public ObjectManagerInterface { MOCK_METHOD1(Cancel, ray::Status(const ObjectID &object_id)); }; +class MockReconstructionPolicy : public ReconstructionPolicyInterface { + public: + MOCK_METHOD1(ListenAndMaybeReconstruct, void(const ObjectID &object_id)); + MOCK_METHOD1(Cancel, void(const ObjectID &object_id)); +}; + +class MockGcs : public gcs::TableInterface { + public: + MOCK_METHOD4( + Add, + ray::Status(const JobID &job_id, const TaskID &task_id, + std::shared_ptr &task_data, + const gcs::TableInterface::WriteCallback &done)); +}; + class TaskDependencyManagerTest : public ::testing::Test { public: TaskDependencyManagerTest() - : object_manager_mock_(), task_dependency_manager_(object_manager_mock_) {} + : object_manager_mock_(), + reconstruction_policy_mock_(), + io_service_(), + gcs_mock_(), + initial_lease_period_ms_(100), + task_dependency_manager_(object_manager_mock_, reconstruction_policy_mock_, + io_service_, ClientID::nil(), initial_lease_period_ms_, + gcs_mock_) {} + + void Run(uint64_t timeout_ms) { + auto timer_period = boost::posix_time::milliseconds(timeout_ms); + auto timer = std::make_shared(io_service_, timer_period); + timer->async_wait([this](const boost::system::error_code &error) { + ASSERT_FALSE(error); + io_service_.stop(); + }); + io_service_.run(); + io_service_.reset(); + } protected: MockObjectManager object_manager_mock_; + MockReconstructionPolicy reconstruction_policy_mock_; + boost::asio::io_service io_service_; + MockGcs gcs_mock_; + int64_t initial_lease_period_ms_; TaskDependencyManager task_dependency_manager_; }; @@ -74,6 +111,7 @@ TEST_F(TaskDependencyManagerTest, TestSimpleTask) { // arguments should be remote. for (const auto &argument_id : arguments) { EXPECT_CALL(object_manager_mock_, Pull(argument_id)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id)); } // Subscribe to the task's dependencies. bool ready = task_dependency_manager_.SubscribeDependencies(task_id, arguments); @@ -82,6 +120,7 @@ TEST_F(TaskDependencyManagerTest, TestSimpleTask) { // All arguments should be canceled as they become available locally. for (const auto &argument_id : arguments) { EXPECT_CALL(object_manager_mock_, Cancel(argument_id)); + EXPECT_CALL(reconstruction_policy_mock_, Cancel(argument_id)); } // For each argument except the last, tell the task dependency manager that // the argument is local. @@ -110,6 +149,7 @@ TEST_F(TaskDependencyManagerTest, TestDuplicateSubscribe) { // duplicates of previous subscription calls. Each argument should only be // requested from the node manager once. EXPECT_CALL(object_manager_mock_, Pull(argument_id)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id)); bool ready = task_dependency_manager_.SubscribeDependencies(task_id, arguments); ASSERT_FALSE(ready); } @@ -117,6 +157,7 @@ TEST_F(TaskDependencyManagerTest, TestDuplicateSubscribe) { // All arguments should be canceled as they become available locally. for (const auto &argument_id : arguments) { EXPECT_CALL(object_manager_mock_, Cancel(argument_id)); + EXPECT_CALL(reconstruction_policy_mock_, Cancel(argument_id)); } // For each argument except the last, tell the task dependency manager that // the argument is local. @@ -140,6 +181,7 @@ TEST_F(TaskDependencyManagerTest, TestMultipleTasks) { // The object should only be requested from the object manager once for all // three tasks. EXPECT_CALL(object_manager_mock_, Pull(argument_id)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id)); for (int i = 0; i < num_dependent_tasks; i++) { TaskID task_id = TaskID::from_random(); dependent_tasks.push_back(task_id); @@ -150,6 +192,7 @@ TEST_F(TaskDependencyManagerTest, TestMultipleTasks) { // Tell the task dependency manager that the object is local. EXPECT_CALL(object_manager_mock_, Cancel(argument_id)); + EXPECT_CALL(reconstruction_policy_mock_, Cancel(argument_id)); auto ready_task_ids = task_dependency_manager_.HandleObjectLocal(argument_id); // Check that all tasks are now ready to run. ASSERT_EQ(ready_task_ids.size(), dependent_tasks.size()); @@ -169,7 +212,9 @@ TEST_F(TaskDependencyManagerTest, TestTaskChain) { // No objects should be remote or canceled since each task depends on a // locally queued task. EXPECT_CALL(object_manager_mock_, Pull(_)).Times(0); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(_)).Times(0); EXPECT_CALL(object_manager_mock_, Cancel(_)).Times(0); + EXPECT_CALL(reconstruction_policy_mock_, Cancel(_)).Times(0); for (const auto &task : tasks) { // Subscribe to each of the tasks' arguments. auto arguments = task.GetDependencies(); @@ -183,7 +228,9 @@ TEST_F(TaskDependencyManagerTest, TestTaskChain) { ASSERT_FALSE(ready); } - // Mark each task as pending. + // Mark each task as pending. A lease entry should be added to the GCS for + // each task. + EXPECT_CALL(gcs_mock_, Add(_, task.GetTaskSpecification().TaskId(), _, _)); task_dependency_manager_.TaskPending(task); i++; @@ -224,6 +271,7 @@ TEST_F(TaskDependencyManagerTest, TestDependentPut) { // No objects have been registered in the task dependency manager, so the put // object should be remote. EXPECT_CALL(object_manager_mock_, Pull(put_id)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(put_id)); // Subscribe to the task's dependencies. bool ready = task_dependency_manager_.SubscribeDependencies( task2.GetTaskSpecification().TaskId(), {put_id}); @@ -232,6 +280,8 @@ TEST_F(TaskDependencyManagerTest, TestDependentPut) { // The put object should be considered local as soon as the task that creates // it is pending execution. EXPECT_CALL(object_manager_mock_, Cancel(put_id)); + EXPECT_CALL(reconstruction_policy_mock_, Cancel(put_id)); + EXPECT_CALL(gcs_mock_, Add(_, task1.GetTaskSpecification().TaskId(), _, _)); task_dependency_manager_.TaskPending(task1); } @@ -244,6 +294,7 @@ TEST_F(TaskDependencyManagerTest, TestTaskForwarding) { auto arguments = task.GetDependencies(); static_cast(task_dependency_manager_.SubscribeDependencies( task.GetTaskSpecification().TaskId(), arguments)); + EXPECT_CALL(gcs_mock_, Add(_, task.GetTaskSpecification().TaskId(), _, _)); task_dependency_manager_.TaskPending(task); } @@ -256,11 +307,13 @@ TEST_F(TaskDependencyManagerTest, TestTaskForwarding) { // The object returned by the first task should be considered remote once we // cancel the forwarded task, since the second task depends on it. EXPECT_CALL(object_manager_mock_, Pull(return_id)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(return_id)); task_dependency_manager_.TaskCanceled(task_id); // Simulate the task executing on a remote node and its return value // appearing locally. EXPECT_CALL(object_manager_mock_, Cancel(return_id)); + EXPECT_CALL(reconstruction_policy_mock_, Cancel(return_id)); auto ready_tasks = task_dependency_manager_.HandleObjectLocal(return_id); // Check that the task that we kept is now ready to run. ASSERT_EQ(ready_tasks.size(), 1); @@ -279,6 +332,7 @@ TEST_F(TaskDependencyManagerTest, TestEviction) { // arguments should be remote. for (const auto &argument_id : arguments) { EXPECT_CALL(object_manager_mock_, Pull(argument_id)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id)); } // Subscribe to the task's dependencies. bool ready = task_dependency_manager_.SubscribeDependencies(task_id, arguments); @@ -288,6 +342,7 @@ TEST_F(TaskDependencyManagerTest, TestEviction) { // available. for (const auto &argument_id : arguments) { EXPECT_CALL(object_manager_mock_, Cancel(argument_id)); + EXPECT_CALL(reconstruction_policy_mock_, Cancel(argument_id)); } for (size_t i = 0; i < arguments.size(); i++) { std::vector ready_tasks; @@ -304,6 +359,7 @@ TEST_F(TaskDependencyManagerTest, TestEviction) { // considered remote. for (const auto &argument_id : arguments) { EXPECT_CALL(object_manager_mock_, Pull(argument_id)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id)); } for (size_t i = 0; i < arguments.size(); i++) { std::vector waiting_tasks; @@ -324,6 +380,7 @@ TEST_F(TaskDependencyManagerTest, TestEviction) { // again. for (const auto &argument_id : arguments) { EXPECT_CALL(object_manager_mock_, Cancel(argument_id)); + EXPECT_CALL(reconstruction_policy_mock_, Cancel(argument_id)); } for (size_t i = 0; i < arguments.size(); i++) { std::vector ready_tasks; @@ -337,6 +394,26 @@ TEST_F(TaskDependencyManagerTest, TestEviction) { } } +TEST_F(TaskDependencyManagerTest, TestTaskLeaseRenewal) { + // Mark a task as pending. + auto task = ExampleTask({}, 0); + // We expect an initial call to acquire the lease. + EXPECT_CALL(gcs_mock_, Add(_, task.GetTaskSpecification().TaskId(), _, _)); + task_dependency_manager_.TaskPending(task); + + // Check that while the task is still pending, there is one call to renew the + // lease for each lease period that passes. The lease period doubles with + // each renewal. + int num_expected_calls = 4; + int64_t sleep_time = 0; + for (int i = 1; i <= num_expected_calls; i++) { + sleep_time += i * initial_lease_period_ms_; + } + EXPECT_CALL(gcs_mock_, Add(_, task.GetTaskSpecification().TaskId(), _, _)) + .Times(num_expected_calls); + Run(sleep_time); +} + } // namespace raylet } // namespace ray diff --git a/src/ray/util/util.h b/src/ray/util/util.h index 42632fa6b..6bf4a165a 100644 --- a/src/ray/util/util.h +++ b/src/ray/util/util.h @@ -3,17 +3,27 @@ #include -/// Return the number of milliseconds since the Unix epoch. +/// Return the number of milliseconds since the steady clock epoch. NOTE: The +/// returned timestamp may be used for accurately measuring intervals but has +/// no relation to wall clock time. It must not be used for synchronization +/// across multiple nodes. /// /// TODO(rkn): This function appears in multiple places. It should be /// deduplicated. /// -/// \return The number of milliseconds since the Unix epoch. -int64_t current_time_ms() { +/// \return The number of milliseconds since the steady clock epoch. +inline int64_t current_time_ms() { std::chrono::milliseconds ms_since_epoch = std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()); return ms_since_epoch.count(); } +inline int64_t current_sys_time_ms() { + std::chrono::milliseconds ms_since_epoch = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()); + return ms_since_epoch.count(); +} + #endif // RAY_UTIL_UTIL_H