Raise application-level exceptions for some failure scenarios. (#2429)

* Raise application level exception for actor methods that can't be executed and failed tasks.

* Retry task forwarding for actor tasks.

* Small cleanups

* Move constant to ray_config.

* Create ForwardTaskOrResubmit method.

* Minor

* Clean up queued tasks for dead actors.

* Some cleanups.

* Linting

* Notify task_dependency_manager_ about failed tasks.

* Manage timer lifetime better.

* Use smart pointers to deallocate the timer.

* Fix

* add comment
This commit is contained in:
Robert Nishihara
2018-07-27 16:53:30 -07:00
committed by Stephanie Wang
parent 24649726dc
commit 2be1ccbd8f
12 changed files with 229 additions and 44 deletions
+9
View File
@@ -88,6 +88,10 @@ class RayConfig {
return actor_creation_num_spillbacks_warning_;
}
int node_manager_forward_task_retry_timeout_milliseconds() const {
return node_manager_forward_task_retry_timeout_milliseconds_;
}
int object_manager_pull_timeout_ms() const {
return object_manager_pull_timeout_ms_;
}
@@ -131,6 +135,7 @@ class RayConfig {
L3_cache_size_bytes_(100000000),
max_tasks_to_spillback_(10),
actor_creation_num_spillbacks_warning_(100),
node_manager_forward_task_retry_timeout_milliseconds_(1000),
// TODO: Setting this to large values results in latency, which needs to
// be addressed. This timeout is often on the critical path for object
// transfers.
@@ -226,6 +231,10 @@ class RayConfig {
/// a value of 100 corresponds to a warning every 10 seconds.
int64_t actor_creation_num_spillbacks_warning_;
/// If a node manager attempts to forward a task to another node manager and
/// the forward fails, then it will resubmit the task after this duration.
int64_t node_manager_forward_task_retry_timeout_milliseconds_;
/// Timeout, in milliseconds, to wait before retrying a failed pull in the
/// ObjectManager.
int object_manager_pull_timeout_ms_;
+6 -8
View File
@@ -96,8 +96,7 @@ void ObjectManager::NotifyDirectoryObjectAdd(const ObjectInfoT &object_info) {
if (iter != unfulfilled_push_requests_.end()) {
for (auto &pair : iter->second) {
auto &client_id = pair.first;
main_service_->post(
[this, object_id, client_id]() { RAY_CHECK_OK(Push(object_id, client_id)); });
main_service_->post([this, object_id, client_id]() { Push(object_id, client_id); });
// When push timeout is set to -1, there will be an empty timer in pair.second.
if (pair.second != nullptr) {
pair.second->cancel();
@@ -226,7 +225,7 @@ void ObjectManager::HandlePushTaskTimeout(const ObjectID &object_id,
}
}
ray::Status ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
if (local_objects_.count(object_id) == 0) {
// Avoid setting duplicated timer for the same object and client pair.
auto &clients = unfulfilled_push_requests_[object_id];
@@ -256,12 +255,12 @@ ray::Status ObjectManager::Push(const ObjectID &object_id, const ClientID &clien
clients.emplace(client_id, std::move(timer));
}
}
return ray::Status::OK();
return;
}
// TODO(hme): Cache this data in ObjectDirectory.
// Okay for now since the GCS client caches this data.
Status status = object_directory_->GetInformation(
RAY_CHECK_OK(object_directory_->GetInformation(
client_id,
[this, object_id, client_id](const RemoteConnectionInfo &info) {
const ObjectInfoT &object_info = local_objects_[object_id];
@@ -279,8 +278,7 @@ ray::Status ObjectManager::Push(const ObjectID &object_id, const ClientID &clien
},
[](const Status &status) {
// Push is best effort, so do nothing here.
});
return status;
}));
}
void ObjectManager::ExecuteSendObject(const ClientID &client_id,
@@ -595,7 +593,7 @@ void ObjectManager::ReceivePullRequest(std::shared_ptr<TcpClientConnection> &con
auto pr = flatbuffers::GetRoot<object_manager_protocol::PullRequestMessage>(message);
ObjectID object_id = ObjectID::from_binary(pr->object_id()->str());
ClientID client_id = ClientID::from_binary(pr->client_id()->str());
ray::Status push_status = Push(object_id, client_id);
Push(object_id, client_id);
conn->ProcessMessages();
}
+3 -3
View File
@@ -38,7 +38,7 @@ struct ObjectManagerConfig {
int max_receives;
/// Object chunk size, in bytes
uint64_t object_chunk_size;
/// The stored socked name.
/// The store socket name.
std::string store_socket_name;
/// The time in milliseconds to wait until a Push request
/// fails due to unsatisfied local object. Special value:
@@ -101,8 +101,8 @@ class ObjectManager : public ObjectManagerInterface {
///
/// \param object_id The object's object id.
/// \param client_id The remote node's client id.
/// \return Status of whether the push request successfully initiated.
ray::Status Push(const ObjectID &object_id, const ClientID &client_id);
/// \return Void.
void Push(const ObjectID &object_id, const ClientID &client_id);
/// Pull an object from ClientID. Returns UniqueID asociated with
/// an invocation of this method.
@@ -362,21 +362,21 @@ class StressTestObjectManager : public TestObjectManagerBase {
case TransferPattern::PUSH_A_B: {
for (int i = -1; ++i < num_trials;) {
ObjectID oid1 = WriteDataToClient(client1, data_size);
status = server1->object_manager_.Push(oid1, client_id_2);
server1->object_manager_.Push(oid1, client_id_2);
}
} break;
case TransferPattern::PUSH_B_A: {
for (int i = -1; ++i < num_trials;) {
ObjectID oid2 = WriteDataToClient(client2, data_size);
status = server2->object_manager_.Push(oid2, client_id_1);
server2->object_manager_.Push(oid2, client_id_1);
}
} break;
case TransferPattern::BIDIRECTIONAL_PUSH: {
for (int i = -1; ++i < num_trials;) {
ObjectID oid1 = WriteDataToClient(client1, data_size);
status = server1->object_manager_.Push(oid1, client_id_2);
server1->object_manager_.Push(oid1, client_id_2);
ObjectID oid2 = WriteDataToClient(client2, data_size);
status = server2->object_manager_.Push(oid2, client_id_1);
server2->object_manager_.Push(oid2, client_id_1);
}
} break;
case TransferPattern::PULL_A_B: {
@@ -246,14 +246,14 @@ class TestObjectManager : public TestObjectManagerBase {
// dummy_id is not local. The push function will timeout.
ObjectID dummy_id = ObjectID::from_random();
status = server1->object_manager_.Push(
dummy_id, gcs_client_2->client_table().GetLocalClientId());
server1->object_manager_.Push(dummy_id,
gcs_client_2->client_table().GetLocalClientId());
created_object_id1 = ObjectID::from_random();
WriteDataToClient(client1, data_size, created_object_id1);
// Server1 holds Object1 so this Push call will success.
status = server1->object_manager_.Push(
created_object_id1, gcs_client_2->client_table().GetLocalClientId());
server1->object_manager_.Push(created_object_id1,
gcs_client_2->client_table().GetLocalClientId());
// This timer is used to guarantee that the Push function for dummy_id will timeout.
timer.reset(new boost::asio::deadline_timer(main_service));
+5
View File
@@ -8,6 +8,7 @@ namespace raylet {
ActorRegistration::ActorRegistration(const ActorTableDataT &actor_table_data)
: actor_table_data_(actor_table_data),
alive_(true),
execution_dependency_(ObjectID::nil()),
frontier_() {}
@@ -36,6 +37,10 @@ void ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id,
execution_dependency_ = execution_dependency;
}
bool ActorRegistration::IsAlive() const { return alive_; }
void ActorRegistration::MarkDead() { alive_ = false; }
} // namespace raylet
} // namespace ray
+14 -1
View File
@@ -14,7 +14,8 @@ namespace raylet {
///
/// Information about an actor registered in the system. This includes the
/// actor's current node manager location, and if local, information about its
/// current execution state, used for reconstruction purposes.
/// current execution state, used for reconstruction purposes, and whether the
/// actor is currently alive or not.
class ActorRegistration {
public:
/// Create an actor registration.
@@ -74,10 +75,22 @@ class ActorRegistration {
void ExtendFrontier(const ActorHandleID &handle_id,
const ObjectID &execution_dependency);
/// Return whether the actor is alive or not. This should only be called on
/// local actors.
///
/// \return True if the local actor is alive and false if it is dead.
bool IsAlive() const;
/// Mark the actor as dead.
/// \return Void.
void MarkDead();
private:
/// Information from the global actor table about this actor, including the
/// node manager location.
ActorTableDataT actor_table_data_;
/// True if the actor is alive and false otherwise.
bool alive_;
/// The object representing the state following the actor's most recently
/// executed task. The next task to execute on the actor should be marked as
/// execution-dependent on this object.
+1
View File
@@ -48,6 +48,7 @@ int main(int argc, char *argv[]) {
node_manager_config.heartbeat_period_ms =
RayConfig::instance().heartbeat_timeout_milliseconds();
node_manager_config.max_lineage_size = RayConfig::instance().max_lineage_size();
node_manager_config.store_socket_name = store_socket_name;
// Configuration for the object manager.
ray::ObjectManagerConfig object_manager_config;
+139 -14
View File
@@ -111,6 +111,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
}));
RAY_CHECK_OK(object_manager_.SubscribeObjDeleted(
[this](const ObjectID &object_id) { HandleObjectMissing(object_id); }));
ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str(), "", 0));
}
ray::Status NodeManager::RegisterGcs() {
@@ -359,6 +361,38 @@ void NodeManager::HandleActorCreation(const ActorID &actor_id,
}
}
void NodeManager::GetActorTasksFromList(const ActorID &actor_id,
const std::list<Task> &tasks,
std::unordered_set<TaskID> &tasks_to_remove) {
for (auto const &task : tasks) {
auto const &spec = task.GetTaskSpecification();
if (actor_id == spec.ActorId()) {
tasks_to_remove.insert(spec.TaskId());
}
}
}
void NodeManager::CleanUpTasksForDeadActor(const ActorID &actor_id) {
// TODO(rkn): The code below should be cleaned up when we improve the
// SchedulingQueue API.
std::unordered_set<TaskID> tasks_to_remove;
GetActorTasksFromList(actor_id, local_queues_.GetUncreatedActorMethods(),
tasks_to_remove);
GetActorTasksFromList(actor_id, local_queues_.GetWaitingTasks(), tasks_to_remove);
GetActorTasksFromList(actor_id, local_queues_.GetPlaceableTasks(), tasks_to_remove);
GetActorTasksFromList(actor_id, local_queues_.GetReadyTasks(), tasks_to_remove);
GetActorTasksFromList(actor_id, local_queues_.GetRunningTasks(), tasks_to_remove);
GetActorTasksFromList(actor_id, local_queues_.GetBlockedTasks(), tasks_to_remove);
auto removed_tasks = local_queues_.RemoveTasks(tasks_to_remove);
for (auto const &task : removed_tasks) {
const TaskSpecification &spec = task.GetTaskSpecification();
TreatTaskAsFailed(spec);
task_dependency_manager_.TaskCanceled(spec.TaskId());
}
}
void NodeManager::ProcessNewClient(LocalClientConnection &client) {
// The new client is a worker, so begin listening for messages.
client.ProcessMessages();
@@ -437,7 +471,8 @@ void NodeManager::ProcessClientMessage(
});
RAY_CHECK(running_tasks.size() != 0);
RAY_CHECK(it != running_tasks.end());
JobID job_id = it->GetTaskSpecification().DriverId();
const TaskSpecification &spec = it->GetTaskSpecification();
const JobID job_id = spec.DriverId();
// TODO(rkn): Define this constant somewhere else.
std::string type = "worker_died";
std::ostringstream error_message;
@@ -445,10 +480,30 @@ void NodeManager::ProcessClientMessage(
<< ".";
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
job_id, type, error_message.str(), current_time_ms()));
// Handle the task failure in order to raise an exception in the
// application.
TreatTaskAsFailed(spec);
task_dependency_manager_.TaskCanceled(spec.TaskId());
local_queues_.RemoveTask(spec.TaskId());
}
worker_pool_.DisconnectWorker(worker);
// If the worker was an actor, add it to the list of dead actors.
const ActorID actor_id = worker->GetActorId();
if (!actor_id.is_nil()) {
// TODO(rkn): Consider broadcasting a message to all of the other
// node managers so that they can mark the actor as dead.
RAY_LOG(DEBUG) << "The actor with ID " << actor_id << " died.";
auto actor_entry = actor_registry_.find(actor_id);
RAY_CHECK(actor_entry != actor_registry_.end());
actor_entry->second.MarkDead();
// For dead actors, if there are remaining tasks for this actor, we
// should handle them.
CleanUpTasksForDeadActor(actor_id);
}
const ClientID &client_id = gcs_client_->client_table().GetLocalClientId();
// Return the resources that were being used by this worker.
@@ -652,15 +707,16 @@ void NodeManager::ScheduleTasks() {
std::unordered_set<TaskID> local_task_ids;
// Iterate over (taskid, clientid) pairs, extract tasks assigned to the local node.
for (const auto &task_schedule : policy_decision) {
TaskID task_id = task_schedule.first;
ClientID client_id = task_schedule.second;
const TaskID task_id = task_schedule.first;
const ClientID client_id = task_schedule.second;
if (client_id == gcs_client_->client_table().GetLocalClientId()) {
local_task_ids.insert(task_id);
} else {
// TODO(atumanov): need a better interface for task exit on forward.
const auto task = local_queues_.RemoveTask(task_id);
// TODO(swang): Handle forward task failure.
RAY_CHECK_OK(ForwardTask(task, client_id));
// Attempt to forward the task. If this fails to forward the task,
// the task will be resubmit locally.
ForwardTaskOrResubmit(task, client_id);
}
}
@@ -673,6 +729,34 @@ void NodeManager::ScheduleTasks() {
}
}
void NodeManager::TreatTaskAsFailed(const TaskSpecification &spec) {
RAY_LOG(DEBUG) << "Treating task " << spec.TaskId() << " as failed.";
// Loop over the return IDs (except the dummy ID) and store a fake object in
// the object store.
int64_t num_returns = spec.NumReturns();
if (spec.IsActorTask()) {
// TODO(rkn): We subtract 1 to avoid the dummy ID. However, this leaks
// information about the TaskSpecification implementation.
num_returns -= 1;
}
for (int64_t i = 0; i < num_returns; i++) {
const ObjectID object_id = spec.ReturnId(i);
std::shared_ptr<Buffer> data;
// TODO(ekl): this writes an invalid arrow object, which is sufficient to
// signal that the worker failed, but it would be nice to return more
// detailed failure metadata in the future.
arrow::Status status =
store_client_.Create(object_id.to_plasma_id(), 1, NULL, 0, &data);
if (!status.IsPlasmaObjectExists()) {
// TODO(rkn): We probably don't want this checks. E.g., if the object
// store is full, we don't want to kill the raylet.
ARROW_CHECK_OK(status);
ARROW_CHECK_OK(store_client_.Seal(object_id.to_plasma_id()));
}
}
}
void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage,
bool forwarded) {
// Add the task and its uncommitted lineage to the lineage cache.
@@ -690,13 +774,26 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
// We have a known location for the actor.
auto node_manager_id = actor_entry->second.GetNodeManagerId();
if (node_manager_id == gcs_client_->client_table().GetLocalClientId()) {
// The actor is local. Queue the task for local execution, bypassing placement.
EnqueuePlaceableTask(task);
// The actor is local. Check if the actor is still alive.
if (!actor_entry->second.IsAlive()) {
// Handle the fact that this actor is dead.
TreatTaskAsFailed(spec);
} else {
// Queue the task for local execution, bypassing placement.
EnqueuePlaceableTask(task);
}
} else {
// The actor is remote. Forward the task to the node manager that owns
// the actor.
// TODO(swang): Handle forward task failure.
RAY_CHECK_OK(ForwardTask(task, node_manager_id));
if (removed_clients_.find(node_manager_id) != removed_clients_.end()) {
// The remote node manager is dead, so handle the fact that this actor
// is also dead.
TreatTaskAsFailed(spec);
} else {
// Attempt to forward the task. If this fails to forward the task,
// the task will be resubmit locally.
ForwardTaskOrResubmit(task, node_manager_id);
}
}
} else {
// We do not have a registered location for the object, so either the
@@ -1033,6 +1130,35 @@ void NodeManager::HandleObjectMissing(const ObjectID &object_id) {
}
}
void NodeManager::ForwardTaskOrResubmit(const Task &task,
const ClientID &node_manager_id) {
/// TODO(rkn): Should we check that the node manager is remote and not local?
/// TODO(rkn): Should we check if the remote node manager is known to be dead?
const TaskID task_id = task.GetTaskSpecification().TaskId();
// Attempt to forward the task.
if (!ForwardTask(task, node_manager_id).ok()) {
RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager "
<< node_manager_id;
// Create a timer to resubmit the task in a little bit. TODO(rkn): Really
// this should be a unique_ptr instead of a shared_ptr. However, it's a
// little harder to move unique_ptrs into lambdas.
auto retry_timer = std::make_shared<boost::asio::deadline_timer>(io_service_);
auto retry_duration = boost::posix_time::milliseconds(
RayConfig::instance().node_manager_forward_task_retry_timeout_milliseconds());
retry_timer->expires_from_now(retry_duration);
retry_timer->async_wait(
[this, task, task_id, retry_timer](const boost::system::error_code &error) {
// Timer killing will receive the boost::asio::error::operation_aborted,
// we only handle the timeout event.
RAY_CHECK(!error);
RAY_LOG(INFO) << "In ForwardTask retry callback for task " << task_id;
EnqueuePlaceableTask(task);
});
}
}
ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id) {
const auto &spec = task.GetTaskSpecification();
auto task_id = spec.TaskId();
@@ -1086,16 +1212,15 @@ ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id)
ObjectID argument_id = spec.ArgId(i, j);
// If the argument is local, then push it to the receiving node.
if (task_dependency_manager_.CheckObjectLocal(argument_id)) {
RAY_CHECK_OK(object_manager_.Push(argument_id, node_id));
object_manager_.Push(argument_id, node_id);
}
}
}
}
} else {
// TODO(atumanov): caller must handle ForwardTask failure to ensure tasks are not
// lost.
RAY_LOG(FATAL) << "[NodeManager][ForwardTask] failed to forward task " << task_id
<< " to node " << node_id;
// TODO(atumanov): caller must handle ForwardTask failure.
RAY_LOG(WARNING) << "[NodeManager][ForwardTask] failed to forward task " << task_id
<< " to node " << node_id;
}
return status;
}
+43
View File
@@ -26,6 +26,8 @@ struct NodeManagerConfig {
std::vector<std::string> worker_command;
uint64_t heartbeat_period_ms;
uint64_t max_lineage_size;
/// The store socket name.
std::string store_socket_name;
};
class NodeManager {
@@ -73,6 +75,14 @@ class NodeManager {
/// Methods for task scheduling.
/// Enqueue a placeable task to wait on object dependencies or be ready for dispatch.
void EnqueuePlaceableTask(const Task &task);
/// This will treat the task as if it had been executed and failed. This is
/// done by looping over the task return IDs and for each ID storing an object
/// that represents a failure in the object store. When clients retrieve these
/// objects, they will raise application-level exceptions.
///
/// \param spec The specification of the task.
/// \return Void.
void TreatTaskAsFailed(const TaskSpecification &spec);
/// Handle specified task's submission to the local node manager.
void SubmitTask(const Task &task, const Lineage &uncommitted_lineage,
bool forwarded = false);
@@ -84,6 +94,12 @@ class NodeManager {
void ScheduleTasks();
/// Resubmit a task whose return value needs to be reconstructed.
void ResubmitTask(const TaskID &task_id);
/// Attempt to forward a task to a remote different node manager. If this
/// fails, the task will be resubmit locally.
///
/// \param task The task in question.
/// \param node_manager_id The ID of the remote node manager.
void ForwardTaskOrResubmit(const Task &task, const ClientID &node_manager_id);
/// Forward a task to another node to execute. The task is assumed to not be
/// queued in local_queues_.
ray::Status ForwardTask(const Task &task, const ClientID &node_id);
@@ -100,6 +116,27 @@ class NodeManager {
void HandleActorCreation(const ActorID &actor_id,
const std::vector<ActorTableDataT> &data);
/// TODO(rkn): This should probably be removed when we improve the
/// SchedulingQueue API. This is a helper function for
/// CleanUpTasksForDeadActor.
///
/// This essentially loops over all of the tasks in the provided list and
/// finds The IDs of the tasks that belong to the given actor.
///
/// \param actor_id The actor to get the tasks for.
/// \param tasks A list of tasks to extract from.
/// \param tasks_to_remove The task IDs of the extracted tasks are inserted in
/// this vector.
void GetActorTasksFromList(const ActorID &actor_id, const std::list<Task> &tasks,
std::unordered_set<TaskID> &tasks_to_remove);
/// When an actor dies, loop over all of the queued tasks for that actor and
/// treat them as failed.
///
/// \param actor_id The actor that died.
/// \return Void.
void CleanUpTasksForDeadActor(const ActorID &actor_id);
/// Methods for managing object dependencies.
/// Handle a dependency required by a queued task that is missing locally.
/// The dependency is (1) on a remote node, (2) pending creation on a remote
@@ -117,6 +154,10 @@ class NodeManager {
boost::asio::io_service &io_service_;
ObjectManager &object_manager_;
/// A Plasma object store client. This is used exclusively for creating new
/// objects in the object store (e.g., for actor tasks that can't be run
/// because the actor died).
plasma::PlasmaClient store_client_;
/// A client connection to the GCS.
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
boost::asio::deadline_timer heartbeat_timer_;
@@ -143,6 +184,8 @@ class NodeManager {
/// this could grow unbounded.
std::unordered_set<ClientID> removed_clients_;
std::unordered_map<ClientID, TcpServerConnection> remote_server_connections_;
/// A mapping from actor ID to registration information about that actor
/// (including which node manager owns it).
std::unordered_map<ActorID, ActorRegistration> actor_registry_;
};
@@ -174,7 +174,7 @@ class TestObjectManagerIntegration : public TestObjectManagerBase {
num_expected_objects = (uint)1;
ObjectID oid1 = WriteDataToClient(client1, data_size);
status = server1->object_manager_.Push(oid1, client_id_2);
server1->object_manager_.Push(oid1, client_id_2);
}
void TestPushComplete() {
-9
View File
@@ -300,9 +300,6 @@ class WorkerDeath(unittest.TestCase):
self.assertIn("died or was killed while executing",
ray.error_info()[0]["message"])
@unittest.skipIf(
os.environ.get("RAY_USE_XRAY") == "1",
"This test does not work with xray yet.")
def testActorWorkerDying(self):
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)
@@ -321,9 +318,6 @@ class WorkerDeath(unittest.TestCase):
self.assertRaises(Exception, lambda: ray.get(consume.remote(obj)))
wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1)
@unittest.skipIf(
os.environ.get("RAY_USE_XRAY") == "1",
"This test does not work with xray yet.")
def testActorWorkerDyingFutureTasks(self):
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)
@@ -346,9 +340,6 @@ class WorkerDeath(unittest.TestCase):
wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1)
@unittest.skipIf(
os.environ.get("RAY_USE_XRAY") == "1",
"This test does not work with xray yet.")
def testActorWorkerDyingNothingInProgress(self):
ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)