From 677004ee3d2b7c7980e2af3551657edea730c9da Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sat, 28 Dec 2019 14:40:32 -0800 Subject: [PATCH] Add 'ray stat' command for debugging (#6622) * wip * wip * wip * iterate * move * fix thread safety --- doc/source/package-ref.rst | 4 +++ python/ray/scripts/scripts.py | 41 ++++++++++++++++++++++++----- src/ray/core_worker/context.cc | 8 +++--- src/ray/core_worker/context.h | 2 +- src/ray/core_worker/core_worker.cc | 38 ++++++++++++++++++++++++-- src/ray/core_worker/core_worker.h | 21 +++++++++------ src/ray/core_worker/task_manager.h | 3 +++ src/ray/protobuf/common.proto | 10 +++++++ src/ray/protobuf/core_worker.proto | 2 ++ src/ray/protobuf/node_manager.proto | 2 ++ src/ray/raylet/node_manager.cc | 1 + 11 files changed, 111 insertions(+), 21 deletions(-) diff --git a/doc/source/package-ref.rst b/doc/source/package-ref.rst index 584c504d9..b4dbee949 100644 --- a/doc/source/package-ref.rst +++ b/doc/source/package-ref.rst @@ -91,6 +91,10 @@ The Ray Command Line API :prog: ray stack :show-nested: +.. click:: ray.scripts.scripts:stat + :prog: ray stat + :show-nested: + .. click:: ray.scripts.scripts:timeline :prog: ray timeline :show-nested: diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 250ef8a64..a51f29a53 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -823,15 +823,15 @@ def clusterbenchmark(): @cli.command() @click.option( - "--redis-address", + "--address", required=False, type=str, help="Override the redis address to connect to.") -def timeline(redis_address): - if not redis_address: - redis_address = services.find_redis_address_or_die() - logger.info("Connecting to Ray instance at {}.".format(redis_address)) - ray.init(redis_address=redis_address) +def timeline(address): + if not address: + address = services.find_redis_address_or_die() + logger.info("Connecting to Ray instance at {}.".format(address)) + ray.init(address=address) time = datetime.today().strftime("%Y-%m-%d_%H-%M-%S") filename = "/tmp/ray-timeline-{}.json".format(time) ray.timeline(filename=filename) @@ -841,6 +841,34 @@ def timeline(redis_address): "You can open this with chrome://tracing in the Chrome browser.") +@cli.command() +@click.option( + "--address", + required=False, + type=str, + help="Override the address to connect to.") +def stat(address): + if not address: + address = services.find_redis_address_or_die() + logger.info("Connecting to Ray instance at {}.".format(address)) + ray.init(address=address) + + import grpc + from ray.core.generated import node_manager_pb2 + from ray.core.generated import node_manager_pb2_grpc + + for raylet in ray.nodes(): + raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], + ray.nodes()[0]["NodeManagerPort"]) + logger.info("Querying raylet {}".format(raylet_address)) + + channel = grpc.insecure_channel(raylet_address) + stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) + reply = stub.GetNodeStats( + node_manager_pb2.NodeStatsRequest(), timeout=2.0) + print(reply) + + cli.add_command(start) cli.add_command(stop) cli.add_command(create_or_update, name="up") @@ -856,6 +884,7 @@ cli.add_command(get_head_ip, name="get_head_ip") cli.add_command(get_worker_ips) cli.add_command(microbenchmark) cli.add_command(stack) +cli.add_command(stat) cli.add_command(timeline) cli.add_command(project_cli) cli.add_command(session_cli) diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 8cd7015f8..5350a114b 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -60,9 +60,9 @@ WorkerContext::WorkerContext(WorkerType worker_type, const JobID &job_id) // For worker main thread which initializes the WorkerContext, // set task_id according to whether current worker is a driver. // (For other threads it's set to random ID via GetThreadContext). - GetThreadContext(true).SetCurrentTaskId((worker_type_ == WorkerType::DRIVER) - ? TaskID::ForDriverTask(job_id) - : TaskID::Nil()); + GetThreadContext().SetCurrentTaskId((worker_type_ == WorkerType::DRIVER) + ? TaskID::ForDriverTask(job_id) + : TaskID::Nil()); } const WorkerType WorkerContext::GetWorkerType() const { return worker_type_; } @@ -148,7 +148,7 @@ int WorkerContext::CurrentActorMaxConcurrency() const { bool WorkerContext::CurrentActorIsAsync() const { return current_actor_is_asyncio_; } -WorkerThreadContext &WorkerContext::GetThreadContext(bool for_main_thread) { +WorkerThreadContext &WorkerContext::GetThreadContext() { if (thread_context_ == nullptr) { thread_context_ = std::unique_ptr(new WorkerThreadContext()); } diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index 4b776ce49..99f4110af 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -72,7 +72,7 @@ class WorkerContext { boost::thread::id main_thread_id_; private: - static WorkerThreadContext &GetThreadContext(bool for_main_thread = false); + static WorkerThreadContext &GetThreadContext(); /// Per-thread worker context. static thread_local std::unique_ptr thread_context_; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ba72e33c1..e4a2ffa66 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -195,6 +195,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, // behaviour. TODO(ekl) backoff exponentially. RAY_LOG(ERROR) << "Will resubmit task after a 5 second delay: " << spec.DebugString(); + absl::MutexLock lock(&mutex_); to_resubmit_.push_back(std::make_pair(current_time_ms() + 5000, spec)); })); @@ -292,8 +293,13 @@ void CoreWorker::RunIOService() { void CoreWorker::SetCurrentTaskId(const TaskID &task_id) { worker_context_.SetCurrentTaskId(task_id); main_thread_task_id_ = task_id; + bool not_actor_task = false; + { + absl::MutexLock lock(&mutex_); + not_actor_task = actor_id_.IsNil(); + } // Clear all actor handles at the end of each non-actor task. - if (actor_id_.IsNil() && task_id.IsNil()) { + if (not_actor_task && task_id.IsNil()) { absl::MutexLock lock(&actor_handles_mutex_); for (const auto &handle : actor_handles_) { RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(handle.first, nullptr)); @@ -327,6 +333,7 @@ void CoreWorker::ReportActiveObjectIDs() { } void CoreWorker::InternalHeartbeat() { + absl::MutexLock lock(&mutex_); while (!to_resubmit_.empty() && current_time_ms() > to_resubmit_.front().first) { RAY_CHECK_OK(direct_task_submitter_->SubmitTask(to_resubmit_.front().second)); to_resubmit_.pop_front(); @@ -847,6 +854,11 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, worker_context_.SetCurrentTask(task_spec); SetCurrentTaskId(task_spec.TaskId()); + { + absl::MutexLock lock(&mutex_); + current_task_ = task_spec; + } + RayFunction func{task_spec.GetLanguage(), task_spec.FunctionDescriptor()}; std::vector> args; @@ -868,7 +880,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, return_ids.pop_back(); task_type = TaskType::ACTOR_CREATION_TASK; SetActorId(task_spec.ActorCreationId()); - RAY_LOG(INFO) << "Creating actor: " << actor_id_; + RAY_LOG(INFO) << "Creating actor: " << task_spec.ActorCreationId(); } else if (task_spec.IsActorTask()) { RAY_CHECK(return_ids.size() > 0); return_ids.pop_back(); @@ -908,6 +920,10 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, SetCurrentTaskId(TaskID::Nil()); worker_context_.ResetCurrentTask(task_spec); + { + absl::MutexLock lock(&mutex_); + current_task_ = TaskSpecification(); + } return status; } @@ -1069,7 +1085,14 @@ void CoreWorker::HandleKillActor(const rpc::KillActorRequest &request, void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &request, rpc::GetCoreWorkerStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { + absl::MutexLock lock(&mutex_); reply->set_webui_display(webui_display_); + auto stats = reply->mutable_core_worker_stats(); + stats->set_num_pending_tasks(task_manager_->NumPendingTasks()); + stats->set_num_object_ids_in_scope(reference_counter_->NumObjectIDsInScope()); + if (!current_task_.TaskId().IsNil()) { + stats->set_current_task_desc(current_task_.DebugString()); + } send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -1092,4 +1115,15 @@ void CoreWorker::GetAsync(const ObjectID &object_id, SetResultCallback success_c }); } +void CoreWorker::SetActorId(const ActorID &actor_id) { + absl::MutexLock lock(&mutex_); + RAY_CHECK(actor_id_.IsNil()); + actor_id_ = actor_id; +} + +void CoreWorker::SetWebuiDisplay(const std::string &message) { + absl::MutexLock lock(&mutex_); + webui_display_ = message; +} + } // namespace ray diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 7d3cf9a5e..34024b73b 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -98,12 +98,9 @@ class CoreWorker { const JobID &GetCurrentJobId() const { return worker_context_.GetCurrentJobID(); } - void SetActorId(const ActorID &actor_id) { - RAY_CHECK(actor_id_.IsNil()); - actor_id_ = actor_id; - } + void SetActorId(const ActorID &actor_id); - void SetWebuiDisplay(const std::string &message) { webui_display_ = message; } + void SetWebuiDisplay(const std::string &message); /// Increase the reference count for this object ID. /// Increase the local reference count for this object ID. Should be called @@ -638,11 +635,19 @@ class CoreWorker { /// Fields related to task execution. /// + /// Protects around accesses to fields below. This should only ever be held + /// for short-running periods of time. + mutable absl::Mutex mutex_; + /// Our actor ID. If this is nil, then we execute only stateless tasks. - ActorID actor_id_; + ActorID actor_id_ GUARDED_BY(mutex_); + + /// The currently executing task spec. We have to track this separately since + /// we cannot access the thread-local worker contexts from GetCoreWorkerStats() + TaskSpecification current_task_ GUARDED_BY(mutex_); /// String to be displayed on Web UI. - std::string webui_display_; + std::string webui_display_ GUARDED_BY(mutex_); /// Event loop where tasks are processed. boost::asio::io_service task_execution_service_; @@ -671,7 +676,7 @@ class CoreWorker { std::unique_ptr direct_task_receiver_; // Queue of tasks to resubmit when the specified time passes. - std::deque> to_resubmit_; + std::deque> to_resubmit_ GUARDED_BY(mutex_); friend class CoreWorkerTest; }; diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 63f3441eb..c23568e25 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -84,6 +84,9 @@ class TaskManager : public TaskFinisherInterface { /// Return the spec for a pending task. TaskSpecification GetTaskSpec(const TaskID &task_id) const; + /// Return the number of pending tasks. + int NumPendingTasks() const { return pending_tasks_.size(); } + private: /// Treat a pending task as failed. The lock should not be held when calling /// this method because it may trigger callbacks in this or other classes. diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index e62cfd025..35214840b 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -161,3 +161,13 @@ message ResourceMapEntry { // The set of resource ids assigned. repeated ResourceId resource_ids = 2; } + +// Debug info returned from the core worker. +message CoreWorkerStats { + // Debug string of the currently executing task. + string current_task_desc = 1; + // Number of pending normal and actor tasks. + int32 num_pending_tasks = 2; + // Number of object ids in local scope. + int32 num_object_ids_in_scope = 3; +} diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 5aebd2495..8f9dd5439 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -133,6 +133,8 @@ message GetCoreWorkerStatsRequest { message GetCoreWorkerStatsReply { // String displayed on Web UI. string webui_display = 1; + // Debug information returned from the core worker. + CoreWorkerStats core_worker_stats = 2; } service CoreWorkerService { diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 955c0799d..cfbe0e591 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -55,6 +55,8 @@ message WorkerStats { bool is_driver = 2; // String displayed on Web UI. string webui_display = 3; + // Debug information returned from the core worker. + CoreWorkerStats core_worker_stats = 4; } message ViewData { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 1f9f072f1..7bf9316fe 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -3013,6 +3013,7 @@ void NodeManager::HandleNodeStatsRequest(const rpc::NodeStatsRequest &request, worker_stats->set_is_driver(false); reply->set_num_workers(reply->num_workers() + 1); worker_stats->set_webui_display(r.webui_display()); + worker_stats->mutable_core_worker_stats()->MergeFrom(r.core_worker_stats()); if (reply->num_workers() == all_workers.size()) { send_reply_callback(Status::OK(), nullptr, nullptr); }