diff --git a/doc/source/package-ref.rst b/doc/source/package-ref.rst index ac290fbe8..b7dcb165a 100644 --- a/doc/source/package-ref.rst +++ b/doc/source/package-ref.rst @@ -97,6 +97,14 @@ The Ray Command Line API :prog: ray stat :show-nested: +.. click:: ray.scripts.scripts:memory + :prog: ray memory + :show-nested: + +.. click:: ray.scripts.scripts:globalgc + :prog: ray globalgc + :show-nested: + .. click:: ray.scripts.scripts:timeline :prog: ray timeline :show-nested: diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 3124bf685..ca7197a10 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -11,6 +11,7 @@ import numpy import gc import inspect import threading +import traceback import time import logging import os @@ -558,6 +559,46 @@ cdef void gc_collect() nogil: num_freed, end - start)) +# This function introduces ~2-7us of overhead per call (i.e., it can be called +# up to hundreds of thousands of times per second). +cdef void get_py_stack(c_string* stack_out) nogil: + """Get the Python call site. + + This can be called from within C++ code to retrieve the file name and line + number of the Python code that is calling into the core worker. + """ + + with gil: + frame = inspect.currentframe() + msg = "" + while frame: + filename = frame.f_code.co_filename + # Decode Ray internal frames to add annotations. + if filename.endswith("ray/worker.py"): + if frame.f_code.co_name == "put": + msg = "(put object) " + elif filename.endswith("ray/workers/default_worker.py"): + pass + elif filename.endswith("ray/remote_function.py"): + # TODO(ekl) distinguish between task return objects and + # arguments. This can only be done in the core worker. + msg = "(task call) " + elif filename.endswith("ray/actor.py"): + # TODO(ekl) distinguish between actor return objects and + # arguments. This can only be done in the core worker. + msg = "(actor call) " + elif filename.endswith("ray/serialization.py"): + if frame.f_code.co_name == "id_deserializer": + msg = "(deserialize task arg) " + else: + msg += "{}:{}:{}".format( + frame.f_code.co_filename, frame.f_code.co_name, + frame.f_lineno) + break + frame = frame.f_back + stack_out[0] = msg.encode("ascii") + + cdef shared_ptr[CBuffer] string_to_buffer(c_string& c_str): cdef shared_ptr[CBuffer] empty_metadata if c_str.size() == 0: @@ -603,7 +644,8 @@ cdef class CoreWorker: raylet_socket.encode("ascii"), job_id.native(), gcs_options.native()[0], log_dir.encode("utf-8"), node_ip_address.encode("utf-8"), node_manager_port, - task_execution_handler, check_signals, gc_collect, True)) + task_execution_handler, check_signals, gc_collect, + get_py_stack, True)) def run_task_loop(self): with nogil: diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 7558ea4c8..9daf0dee8 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -98,6 +98,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CWorkerID &worker_id) nogil, CRayStatus() nogil, void() nogil, + void(c_string *stack_out) nogil, c_bool ref_counting_enabled) CWorkerType &GetWorkerType() CLanguage &GetLanguage() @@ -188,7 +189,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: unordered_map[CObjectID, pair[size_t, size_t]] GetAllReferenceCounts() void GetAsync(const CObjectID &object_id, - ray_callback_function successs_callback, + ray_callback_function success_callback, ray_callback_function fallback_callback, void* python_future) diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py index d685cf7b3..14d196095 100644 --- a/python/ray/internal/internal_api.py +++ b/python/ray/internal/internal_api.py @@ -11,6 +11,24 @@ def global_gc(): worker.core_worker.global_gc() +def memory_summary(): + """Returns a formatted string describing memory usage in the cluster.""" + + import grpc + from ray.core.generated import node_manager_pb2 + from ray.core.generated import node_manager_pb2_grpc + + # We can ask any Raylet for the global memory info. + raylet = ray.nodes()[0] + raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], + ray.nodes()[0]["NodeManagerPort"]) + channel = grpc.insecure_channel(raylet_address) + stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) + reply = stub.FormatGlobalMemoryInfo( + node_manager_pb2.FormatGlobalMemoryInfoRequest(), timeout=30.0) + return reply.memory_summary + + def free(object_ids, local_only=False, delete_creating_tasks=False): """Free a list of IDs from object stores. diff --git a/python/ray/memory_monitor.py b/python/ray/memory_monitor.py index a7d483e0a..aea7c939c 100644 --- a/python/ray/memory_monitor.py +++ b/python/ray/memory_monitor.py @@ -53,11 +53,9 @@ class RayOutOfMemoryError(Exception): round(get_shared(psutil.virtual_memory()) / (1024**3), 2)) + "currently being used by the Ray object store. You can set " "the object store size with the `object_store_memory` " - "parameter when starting Ray, and the max Redis size with " - "`redis_max_memory`. Note that Ray assumes all system " - "memory is available for use by workers. If your system " - "has other applications running, you should manually set " - "these memory limits to a lower value.") + "parameter when starting Ray.\n---\n" + "--- Tip: Use the `ray memory` command to list active " + "objects in the cluster.\n---\n") class MemoryMonitor: diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 48880e10a..535eff7fb 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -945,10 +945,40 @@ def stat(address): channel = grpc.insecure_channel(raylet_address) stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) reply = stub.GetNodeStats( - node_manager_pb2.GetNodeStatsRequest(), timeout=2.0) + node_manager_pb2.GetNodeStatsRequest(include_memory_info=False), + timeout=2.0) print(reply) +@cli.command() +@click.option( + "--address", + required=False, + type=str, + help="Override the address to connect to.") +def memory(address): + if not address: + address = services.find_redis_address_or_die() + logger.info("Connecting to Ray instance at {}.".format(address)) + ray.init(address=address) + print(ray.internal.internal_api.memory_summary()) + + +@cli.command() +@click.option( + "--address", + required=False, + type=str, + help="Override the address to connect to.") +def globalgc(address): + if not address: + address = services.find_redis_address_or_die() + logger.info("Connecting to Ray instance at {}.".format(address)) + ray.init(address=address) + ray.internal.internal_api.global_gc() + print("Triggered gc.collect() on all workers.") + + cli.add_command(dashboard) cli.add_command(start) cli.add_command(stop) @@ -966,6 +996,8 @@ cli.add_command(get_worker_ips) cli.add_command(microbenchmark) cli.add_command(stack) cli.add_command(stat) +cli.add_command(memory) +cli.add_command(globalgc) cli.add_command(timeline) cli.add_command(project_cli) cli.add_command(session_cli) diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 5fd0aeb6b..f94f6304d 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -22,6 +22,14 @@ py_test( deps = ["//:ray_lib"], ) +py_test( + name = "test_memstat", + size = "small", + srcs = ["test_memstat.py"], + tags = ["exclusive"], + deps = ["//:ray_lib"], +) + py_test( name = "test_iter", size = "medium", diff --git a/python/ray/tests/test_memstat.py b/python/ray/tests/test_memstat.py new file mode 100644 index 000000000..3a69329cf --- /dev/null +++ b/python/ray/tests/test_memstat.py @@ -0,0 +1,195 @@ +import ray +import numpy as np +import time +from ray.internal.internal_api import memory_summary + +# Unique strings. +DRIVER_PID = "driver pid" +WORKER_PID = "worker pid" +UNKNOWN_SIZE = " ? " + +# Reference status. +PINNED_IN_MEMORY = "PINNED_IN_MEMORY" +LOCAL_REF = "LOCAL_REFERENCE" +USED_BY_PENDING_TASK = "USED_BY_PENDING_TASK" +CAPTURED_IN_OBJECT = "CAPTURED_IN_OBJECT" + +# Call sites. +PUT_OBJ = "(put object)" +TASK_CALL_OBJ = "(task call)" +ACTOR_TASK_CALL_OBJ = "(actor call)" +DESER_TASK_ARG = "(deserialize task arg)" +DESER_ACTOR_TASK_ARG = "(deserialize actor task arg)" + + +def data_lines(memory_str): + for line in memory_str.split("\n"): + if (not line or "---" in line or "===" in line or "Object ID" in line + or "pid=" in line): + continue + yield line + + +def num_objects(memory_str): + n = 0 + for line in data_lines(memory_str): + n += 1 + return n + + +def count(memory_str, substr): + n = 0 + for line in memory_str.split("\n"): + if substr in line: + n += 1 + return n + + +def test_driver_put_ref(ray_start_regular): + info = memory_summary() + assert num_objects(info) == 0, info + x_id = ray.put("HI") + info = memory_summary() + print(info) + assert num_objects(info) == 1, info + assert count(info, DRIVER_PID) == 1, info + assert count(info, WORKER_PID) == 0, info + del x_id + info = memory_summary() + assert num_objects(info) == 0, info + + +def test_worker_task_refs(ray_start_regular): + @ray.remote + def f(y): + x_id = ray.put("HI") + info = memory_summary() + del x_id + return info + + x_id = f.remote(np.zeros(100000)) + info = ray.get(x_id) + print(info) + assert num_objects(info) == 4, info + # Task argument plus task return ids. + assert count(info, TASK_CALL_OBJ) == 2, info + assert count(info, DRIVER_PID) == 1, info + assert count(info, WORKER_PID) == 1, info + assert count(info, LOCAL_REF) == 2, info + assert count(info, PINNED_IN_MEMORY) == 1, info + assert count(info, PUT_OBJ) == 1, info + assert count(info, DESER_TASK_ARG) == 1, info + assert count(info, UNKNOWN_SIZE) == 1, info + assert count(info, "test_memstat.py:f") == 1, info + assert count(info, "test_memstat.py:test_worker_task_refs") == 2, info + + info = memory_summary() + print(info) + assert num_objects(info) == 1, info + assert count(info, DRIVER_PID) == 1, info + assert count(info, TASK_CALL_OBJ) == 1, info + assert count(info, UNKNOWN_SIZE) == 0, info + assert count(info, x_id.hex()) == 1, info + + del x_id + info = memory_summary() + assert num_objects(info) == 0, info + + +def test_actor_task_refs(ray_start_regular): + @ray.remote + class Actor: + def __init__(self): + self.refs = [] + + def f(self, x): + self.refs.append(x) + return memory_summary() + + def make_actor(): + return Actor.remote() + + actor = make_actor() + x_id = actor.f.remote(np.zeros(100000)) + info = ray.get(x_id) + print(info) + assert num_objects(info) == 4, info + # Actor handle, task argument id, task return id. + assert count(info, ACTOR_TASK_CALL_OBJ) == 3, info + assert count(info, DRIVER_PID) == 1, info + assert count(info, WORKER_PID) == 1, info + assert count(info, LOCAL_REF) == 1, info + assert count(info, PINNED_IN_MEMORY) == 1, info + assert count(info, USED_BY_PENDING_TASK) == 2, info + assert count(info, DESER_ACTOR_TASK_ARG) == 1, info + assert count(info, "test_memstat.py:test_actor_task_refs") == 2, info + assert count(info, "test_memstat.py:make_actor") == 1, info + del x_id + + # These should accumulate in the actor. + for _ in range(5): + ray.get(actor.f.remote([ray.put(np.zeros(100000))])) + info = memory_summary() + print(info) + assert count(info, DESER_ACTOR_TASK_ARG) == 5, info + assert count(info, ACTOR_TASK_CALL_OBJ) == 1, info + + # Cleanup. + del actor + time.sleep(1) + info = memory_summary() + assert num_objects(info) == 0, info + + +def test_nested_object_refs(ray_start_regular): + x_id = ray.put(np.zeros(100000)) + y_id = ray.put([x_id]) + z_id = ray.put([y_id]) + del x_id, y_id + info = memory_summary() + print(info) + assert num_objects(info) == 3, info + assert count(info, LOCAL_REF) == 1, info + assert count(info, CAPTURED_IN_OBJECT) == 2, info + del z_id + + +def test_pinned_object_call_site(ray_start_regular): + # Local ref only. + x_id = ray.put(np.zeros(100000)) + info = memory_summary() + print(info) + assert num_objects(info) == 1, info + assert count(info, LOCAL_REF) == 1, info + assert count(info, PINNED_IN_MEMORY) == 0, info + assert count(info, "test_memstat.py") == 1, info + + # Local ref + pinned buffer. + buf = ray.get(x_id) + info = memory_summary() + print(info) + assert num_objects(info) == 1, info + assert count(info, LOCAL_REF) == 0, info + assert count(info, PINNED_IN_MEMORY) == 1, info + assert count(info, "test_memstat.py") == 1, info + + # Just pinned buffer. + del x_id + info = memory_summary() + print(info) + assert num_objects(info) == 1, info + assert count(info, LOCAL_REF) == 0, info + assert count(info, PINNED_IN_MEMORY) == 1, info + assert count(info, "test_memstat.py") == 1, info + + # Nothing. + del buf + info = memory_summary() + print(info) + assert num_objects(info) == 0, info + + +if __name__ == "__main__": + import pytest + import sys + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/worker.py b/python/ray/worker.py index 34111a01d..eef923a34 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1549,10 +1549,7 @@ def put(value, weakref=False): except ObjectStoreFullError: logger.info( "Put failed since the value was either too large or the " - "store was full of pinned objects. If you are putting " - "and holding references to a lot of object ids, consider " - "ray.put(value, weakref=True) to allow object data to " - "be evicted early.") + "store was full of pinned objects.") raise return object_id diff --git a/src/ray/common/buffer.h b/src/ray/common/buffer.h index b61e06b00..7ed5db25a 100644 --- a/src/ray/common/buffer.h +++ b/src/ray/common/buffer.h @@ -116,7 +116,9 @@ class LocalMemoryBuffer : public Buffer { /// reference to a plasma object (via the underlying plasma::PlasmaBuffer). class PlasmaBuffer : public Buffer { public: - PlasmaBuffer(std::shared_ptr buffer) : buffer_(buffer) {} + PlasmaBuffer(std::shared_ptr buffer, + std::function on_delete = nullptr) + : buffer_(buffer), on_delete_(on_delete) {} uint8_t *Data() const override { return const_cast(buffer_->data()); } @@ -126,10 +128,18 @@ class PlasmaBuffer : public Buffer { bool IsPlasmaBuffer() const override { return true; } + ~PlasmaBuffer() { + if (on_delete_ != nullptr) { + on_delete_(this); + } + }; + private: /// shared_ptr to arrow buffer which can potentially hold a reference /// for the object (when it's a plasma::PlasmaBuffer). std::shared_ptr buffer_; + /// Callback to run on destruction. + std::function on_delete_; }; } // namespace ray diff --git a/src/ray/common/function_descriptor.h b/src/ray/common/function_descriptor.h index 1f818bd66..3b21760c7 100644 --- a/src/ray/common/function_descriptor.h +++ b/src/ray/common/function_descriptor.h @@ -44,6 +44,9 @@ class FunctionDescriptorInterface : public MessageWrapper Subtype *As() { return reinterpret_cast(this); @@ -133,6 +136,11 @@ class PythonFunctionDescriptor : public FunctionDescriptorInterface { ", function_hash=" + typed_message_->function_hash() + "}"; } + virtual std::string CallSiteString() const { + return typed_message_->module_name() + "." + typed_message_->class_name() + "." + + typed_message_->function_name(); + } + std::string ModuleName() const { return typed_message_->module_name(); } std::string ClassName() const { return typed_message_->class_name(); } diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 7834cf686..5934813a4 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -74,6 +74,11 @@ RAY_CONFIG(bool, object_pinning_enabled, true) /// LRU evicted until it is out of scope on the CREATOR of the ObjectID. RAY_CONFIG(bool, distributed_ref_counting_enabled, false) +/// Whether to record the creation sites of object references. This adds more +/// information to `ray memstat`, but introduces a little extra overhead when +/// creating object references. +RAY_CONFIG(bool, record_ref_creation_sites, true) + /// If object_pinning_enabled is on, then objects that have been unpinned are /// added to a local cache. When the cache is flushed, all objects in the cache /// will be eagerly evicted in a batch by freeing all plasma copies in the diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index ab369b7ce..79b6c0ab3 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -275,4 +275,18 @@ std::string TaskSpecification::DebugString() const { return stream.str(); } +std::string TaskSpecification::CallSiteString() const { + std::ostringstream stream; + auto desc = FunctionDescriptor(); + if (IsActorCreationTask()) { + stream << "(deserialize actor creation task arg) "; + } else if (IsActorTask()) { + stream << "(deserialize actor task arg) "; + } else { + stream << "(deserialize task arg) "; + } + stream << FunctionDescriptor()->CallSiteString(); + return stream.str(); +} + } // namespace ray diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 23c229a4b..a4f26ef61 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -171,6 +171,9 @@ class TaskSpecification : public MessageWrapper { std::string DebugString() const; + // A one-word summary of the task func as a call site (e.g., __main__.foo). + std::string CallSiteString() const; + static SchedulingClassDescriptor &GetSchedulingClassDescriptor(SchedulingClass id); private: diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 6ada5b66a..aaede2c53 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -84,13 +84,17 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, int node_manager_port, const TaskExecutionCallback &task_execution_callback, std::function check_signals, - std::function gc_collect, bool ref_counting_enabled) + std::function gc_collect, + std::function get_lang_stack, + bool ref_counting_enabled) : worker_type_(worker_type), language_(language), log_dir_(log_dir), ref_counting_enabled_(ref_counting_enabled), check_signals_(check_signals), gc_collect_(gc_collect), + get_call_site_(RayConfig::instance().record_ref_creation_sites() ? get_lang_stack + : nullptr), worker_context_(worker_type, job_id), io_work_(io_service_), client_call_manager_(new rpc::ClientCallManager(io_service_)), @@ -188,7 +192,8 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider( store_socket, local_raylet_client_, check_signals_, /*evict_if_full=*/RayConfig::instance().object_pinning_enabled(), - boost::bind(&CoreWorker::TriggerGlobalGC, this))); + boost::bind(&CoreWorker::TriggerGlobalGC, this), + boost::bind(&CoreWorker::CurrentCallSite, this))); memory_store_.reset(new CoreWorkerMemoryStore( [this](const RayObject &obj, const ObjectID &obj_id) { RAY_LOG(DEBUG) << "Promoting object to plasma " << obj_id; @@ -424,7 +429,7 @@ Status CoreWorker::Put(const RayObject &object, worker_context_.GetNextPutIndex(), static_cast(TaskTransportType::DIRECT)); reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(), - rpc_address_); + rpc_address_, CurrentCallSite(), object.GetSize()); return Put(object, contained_object_ids, *object_id, /*pin_object=*/true); } @@ -465,7 +470,8 @@ Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t // Only add the object to the reference counter if it didn't already exist. if (data) { reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(), - rpc_address_); + rpc_address_, CurrentCallSite(), + data_size + metadata->Size()); } return Status::OK(); } @@ -773,7 +779,8 @@ Status CoreWorker::SubmitTask(const RayFunction &function, return_ids); TaskSpecification task_spec = builder.Build(); if (task_options.is_direct_call) { - task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, max_retries); + task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, + CurrentCallSite(), max_retries); return direct_task_submitter_->SubmitTask(task_spec); } else { return local_raylet_client_->SubmitTask(task_spec); @@ -811,7 +818,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, Status status; if (actor_creation_options.is_direct_call) { task_manager_->AddPendingTask( - GetCallerId(), rpc_address_, task_spec, + GetCallerId(), rpc_address_, task_spec, CurrentCallSite(), std::max(RayConfig::instance().actor_creation_min_retries(), actor_creation_options.max_reconstructions)); status = direct_task_submitter_->SubmitTask(task_spec); @@ -864,7 +871,8 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f Status status; TaskSpecification task_spec = builder.Build(); if (is_direct_call) { - task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec); + task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, + CurrentCallSite()); if (actor_handle->IsDead()) { auto status = Status::IOError("sent task to dead actor"); task_manager_->PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_DIED, @@ -923,7 +931,7 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle, bool is_owner_handle) { const auto &actor_id = actor_handle->GetActorID(); const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id); - reference_counter_->AddLocalReference(actor_creation_return_id); + reference_counter_->AddLocalReference(actor_creation_return_id, CurrentCallSite()); bool inserted; { @@ -1088,7 +1096,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, // Pin the borrowed IDs for the duration of the task. for (const auto &borrowed_id : borrowed_ids) { RAY_LOG(DEBUG) << "Incrementing ref for borrowed ID " << borrowed_id; - reference_counter_->AddLocalReference(borrowed_id); + reference_counter_->AddLocalReference(borrowed_id, task_spec.CallSiteString()); } const auto transport_type = worker_context_.CurrentTaskIsDirectCall() @@ -1325,7 +1333,7 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques if (task_manager_->IsTaskPending(object_id.TaskId())) { // Acquire a reference and retry. This prevents the object from being // evicted out from under us before we can start the get. - AddLocalReference(object_id); + AddLocalReference(object_id, ""); if (task_manager_->IsTaskPending(object_id.TaskId())) { // The task is pending. Send the reply once the task finishes. memory_store_->GetAsync(object_id, @@ -1446,6 +1454,12 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & MemoryStoreStats memory_store_stats = memory_store_->GetMemoryStoreStatisticalData(); stats->set_num_local_objects(memory_store_stats.num_local_objects); stats->set_used_object_store_memory(memory_store_stats.used_object_store_memory); + + if (request.include_memory_info()) { + reference_counter_->AddObjectRefStats(plasma_store_provider_->UsedObjectsList(), + stats); + } + send_reply_callback(Status::OK(), nullptr, nullptr); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index b481c2909..a07d3a5ff 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -91,6 +91,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { int node_manager_port, const TaskExecutionCallback &task_execution_callback, std::function check_signals = nullptr, std::function gc_collect = nullptr, + std::function get_lang_stack = nullptr, bool ref_counting_enabled = false); virtual ~CoreWorker(); @@ -125,7 +126,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// /// \param[in] object_id The object ID to increase the reference count for. void AddLocalReference(const ObjectID &object_id) { - reference_counter_->AddLocalReference(object_id); + AddLocalReference(object_id, CurrentCallSite()); } /// Decrease the reference count for this object ID. Should be called @@ -579,6 +580,15 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Private methods related to task submission. /// + /// Increase the local reference count for this object ID. Should be called + /// by the language frontend when a new reference is created. + /// + /// \param[in] object_id The object ID to increase the reference count for. + /// \param[in] call_site The call site from the language frontend. + void AddLocalReference(const ObjectID &object_id, std::string call_site) { + reference_counter_->AddLocalReference(object_id, call_site); + } + /// Give this worker a handle to an actor. /// /// This handle will remain as long as the current actor or task is @@ -684,6 +694,18 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// be held up in garbage objects. std::function gc_collect_; + /// Callback to get the current language (e.g., Python) call site. + std::function get_call_site_; + + // Convenience method to get the current language call site. + std::string CurrentCallSite() { + std::string call_site; + if (get_call_site_ != nullptr) { + get_call_site_(&call_site); + } + return call_site; + } + /// Shared state of the worker. Includes process-level and thread-level state. /// TODO(edoakes): we should move process-level state into this class and make /// this a ThreadContext. diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 25f71f04b..fc670432f 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -86,10 +86,50 @@ bool ReferenceCounter::AddBorrowedObjectInternal(const ObjectID &object_id, return true; } +void ReferenceCounter::AddObjectRefStats( + const absl::flat_hash_map> pinned_objects, + rpc::CoreWorkerStats *stats) const { + absl::MutexLock lock(&mutex_); + for (const auto &ref : object_id_refs_) { + auto ref_proto = stats->add_object_refs(); + ref_proto->set_object_id(ref.first.Binary()); + ref_proto->set_call_site(ref.second.call_site); + ref_proto->set_object_size(ref.second.object_size); + ref_proto->set_local_ref_count(ref.second.local_ref_count); + ref_proto->set_submitted_task_ref_count(ref.second.submitted_task_ref_count); + auto it = pinned_objects.find(ref.first); + if (it != pinned_objects.end()) { + ref_proto->set_pinned_in_memory(true); + // If some info isn't available, fallback to getting it from the pinned info. + if (ref.second.object_size <= 0) { + ref_proto->set_object_size(it->second.first); + } + if (ref.second.call_site.empty()) { + ref_proto->set_call_site(it->second.second); + } + } + for (const auto &obj_id : ref.second.contained_in_owned) { + ref_proto->add_contained_in_owned(obj_id.Binary()); + } + } + // Also include any unreferenced objects that are pinned in memory. + for (const auto &entry : pinned_objects) { + if (object_id_refs_.find(entry.first) == object_id_refs_.end()) { + auto ref_proto = stats->add_object_refs(); + ref_proto->set_object_id(entry.first.Binary()); + ref_proto->set_object_size(entry.second.first); + ref_proto->set_call_site(entry.second.second); + ref_proto->set_pinned_in_memory(true); + } + } +} + void ReferenceCounter::AddOwnedObject(const ObjectID &object_id, const std::vector &inner_ids, const TaskID &owner_id, - const rpc::Address &owner_address) { + const rpc::Address &owner_address, + const std::string &call_site, + const int64_t object_size) { RAY_LOG(DEBUG) << "Adding owned object " << object_id; absl::MutexLock lock(&mutex_); RAY_CHECK(object_id_refs_.count(object_id) == 0) @@ -97,7 +137,8 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id, // If the entry doesn't exist, we initialize the direct reference count to zero // because this corresponds to a submitted task whose return ObjectID will be created // in the frontend language, incrementing the reference count. - object_id_refs_.emplace(object_id, Reference(owner_id, owner_address)); + object_id_refs_.emplace(object_id, + Reference(owner_id, owner_address, call_site, object_size)); if (!inner_ids.empty()) { // Mark that this object ID contains other inner IDs. Then, we will not GC // the inner objects until the outer object ID goes out of scope. @@ -105,12 +146,21 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id, } } -void ReferenceCounter::AddLocalReference(const ObjectID &object_id) { +void ReferenceCounter::UpdateObjectSize(const ObjectID &object_id, int64_t object_size) { + absl::MutexLock lock(&mutex_); + auto it = object_id_refs_.find(object_id); + if (it != object_id_refs_.end()) { + it->second.object_size = object_size; + } +} + +void ReferenceCounter::AddLocalReference(const ObjectID &object_id, + const std::string &call_site) { absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); if (it == object_id_refs_.end()) { // NOTE: ownership info for these objects must be added later via AddBorrowedObject. - it = object_id_refs_.emplace(object_id, Reference()).first; + it = object_id_refs_.emplace(object_id, Reference(call_site, -1)).first; } it->second.local_ref_count++; RAY_LOG(DEBUG) << "Add local reference " << object_id; diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index f80653bdc..95255c17b 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -51,7 +51,8 @@ class ReferenceCounter { /// any owner information, since we don't know how it was created. /// /// \param[in] object_id The object to to increment the count for. - void AddLocalReference(const ObjectID &object_id) LOCKS_EXCLUDED(mutex_); + void AddLocalReference(const ObjectID &object_id, const std::string &call_site) + LOCKS_EXCLUDED(mutex_); /// Decrease the local reference count for the ObjectID by one. /// @@ -106,7 +107,15 @@ class ReferenceCounter { /// \param[in] dependencies The objects that the object depends on. void AddOwnedObject(const ObjectID &object_id, const std::vector &contained_ids, const TaskID &owner_id, - const rpc::Address &owner_address) LOCKS_EXCLUDED(mutex_); + const rpc::Address &owner_address, const std::string &call_site, + const int64_t object_size) LOCKS_EXCLUDED(mutex_); + + /// Update the size of the object. + /// + /// \param[in] object_id The ID of the object. + /// \param[in] size The known size of the object. + void UpdateObjectSize(const ObjectID &object_id, int64_t object_size) + LOCKS_EXCLUDED(mutex_); /// Add an object that we are borrowing. /// @@ -226,13 +235,26 @@ class ReferenceCounter { /// \return Whether we have a reference to the object ID. bool HasReference(const ObjectID &object_id) const LOCKS_EXCLUDED(mutex_); + /// Write the current reference table to the given proto. + /// + /// \param[out] stats The proto to write references to. + void AddObjectRefStats( + const absl::flat_hash_map> pinned_objects, + rpc::CoreWorkerStats *stats) const LOCKS_EXCLUDED(mutex_); + private: struct Reference { /// Constructor for a reference whose origin is unknown. - Reference() : owned_by_us(false) {} + Reference() {} + Reference(std::string call_site, const int64_t object_size) + : call_site(call_site), object_size(object_size) {} /// Constructor for a reference that we created. - Reference(const TaskID &owner_id, const rpc::Address &owner_address) - : owned_by_us(true), owner({owner_id, owner_address}) {} + Reference(const TaskID &owner_id, const rpc::Address &owner_address, + std::string call_site, const int64_t object_size) + : call_site(call_site), + object_size(object_size), + owned_by_us(true), + owner({owner_id, owner_address}) {} /// Constructor from a protobuf. This is assumed to be a message from /// another process, so the object defaults to not being owned by us. @@ -264,10 +286,15 @@ class ReferenceCounter { was_stored_in_objects); } + /// Description of the call site where the reference was created. + std::string call_site = ""; + /// Object size if known, otherwise -1; + int64_t object_size = -1; + /// Whether we own the object. If we own the object, then we are /// responsible for tracking the state of the task that creates the object /// (see task_manager.h). - bool owned_by_us; + bool owned_by_us = false; /// The object's owner, if we know it. This has no value if the object is /// if we do not know the object's owner (because distributed ref counting /// is not yet implemented). diff --git a/src/ray/core_worker/reference_count_test.cc b/src/ray/core_worker/reference_count_test.cc index 283044eb2..9a4244647 100644 --- a/src/ray/core_worker/reference_count_test.cc +++ b/src/ray/core_worker/reference_count_test.cc @@ -98,18 +98,18 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { // The below methods mirror a core worker's operations, e.g., `Put` simulates // a ray.put(). void Put(const ObjectID &object_id) { - rc_.AddOwnedObject(object_id, {}, task_id_, address_); - rc_.AddLocalReference(object_id); + rc_.AddOwnedObject(object_id, {}, task_id_, address_, "", 0); + rc_.AddLocalReference(object_id, ""); } void PutWrappedId(const ObjectID outer_id, const ObjectID &inner_id) { - rc_.AddOwnedObject(outer_id, {inner_id}, task_id_, address_); - rc_.AddLocalReference(outer_id); + rc_.AddOwnedObject(outer_id, {inner_id}, task_id_, address_, "", 0); + rc_.AddLocalReference(outer_id, ""); } void GetSerializedObjectId(const ObjectID outer_id, const ObjectID &inner_id, const TaskID &owner_id, const rpc::Address &owner_address) { - rc_.AddLocalReference(inner_id); + rc_.AddLocalReference(inner_id, ""); rc_.AddBorrowedObject(inner_id, outer_id, owner_id, owner_address); } @@ -117,16 +117,16 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { const TaskID &owner_id, const rpc::Address &owner_address) { // Add a sentinel reference to keep the argument ID in scope even though // the frontend won't have a reference. - rc_.AddLocalReference(arg_id); + rc_.AddLocalReference(arg_id, ""); GetSerializedObjectId(arg_id, inner_id, owner_id, owner_address); } ObjectID SubmitTaskWithArg(const ObjectID &arg_id) { rc_.UpdateSubmittedTaskReferences({arg_id}); ObjectID return_id = ObjectID::FromRandom(); - rc_.AddOwnedObject(return_id, {}, task_id_, address_); + rc_.AddOwnedObject(return_id, {}, task_id_, address_, "", 0); // Add a sentinel reference to keep all nested object IDs in scope. - rc_.AddLocalReference(return_id); + rc_.AddLocalReference(return_id, ""); return return_id; } @@ -183,9 +183,9 @@ TEST_F(ReferenceCountTest, TestBasic) { ObjectID id2 = ObjectID::FromRandom(); // Local references. - rc->AddLocalReference(id1); - rc->AddLocalReference(id1); - rc->AddLocalReference(id2); + rc->AddLocalReference(id1, ""); + rc->AddLocalReference(id1, ""); + rc->AddLocalReference(id2, ""); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); rc->RemoveLocalReference(id1, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); @@ -214,9 +214,9 @@ TEST_F(ReferenceCountTest, TestBasic) { out.clear(); // Local & submitted task references. - rc->AddLocalReference(id1); + rc->AddLocalReference(id1, ""); rc->UpdateSubmittedTaskReferences({id1, id2}); - rc->AddLocalReference(id2); + rc->AddLocalReference(id2, ""); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); rc->RemoveLocalReference(id1, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); @@ -233,6 +233,36 @@ TEST_F(ReferenceCountTest, TestBasic) { out.clear(); } +// Tests call site tracking and ability to update object size. +TEST_F(ReferenceCountTest, TestReferenceStats) { + ObjectID id1 = ObjectID::FromRandom(); + ObjectID id2 = ObjectID::FromRandom(); + TaskID task_id = TaskID::ForFakeTask(); + rpc::Address address; + address.set_ip_address("1234"); + + rc->AddLocalReference(id1, "file.py:42"); + rc->UpdateObjectSize(id1, 200); + + rpc::CoreWorkerStats stats; + rc->AddObjectRefStats({}, &stats); + ASSERT_EQ(stats.object_refs_size(), 1); + ASSERT_EQ(stats.object_refs(0).object_id(), id1.Binary()); + ASSERT_EQ(stats.object_refs(0).local_ref_count(), 1); + ASSERT_EQ(stats.object_refs(0).object_size(), 200); + ASSERT_EQ(stats.object_refs(0).call_site(), "file.py:42"); + rc->RemoveLocalReference(id1, nullptr); + + rc->AddOwnedObject(id2, {}, task_id, address, "file2.py:43", 100); + rpc::CoreWorkerStats stats2; + rc->AddObjectRefStats({}, &stats2); + ASSERT_EQ(stats2.object_refs_size(), 1); + ASSERT_EQ(stats2.object_refs(0).object_id(), id2.Binary()); + ASSERT_EQ(stats2.object_refs(0).local_ref_count(), 0); + ASSERT_EQ(stats2.object_refs(0).object_size(), 100); + ASSERT_EQ(stats2.object_refs(0).call_site(), "file2.py:43"); +} + // Tests that we can get the owner address correctly for objects that we own, // objects that we borrowed via a serialized object ID, and objects whose // origin we do not know. @@ -241,7 +271,7 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) { TaskID task_id = TaskID::ForFakeTask(); rpc::Address address; address.set_ip_address("1234"); - rc->AddOwnedObject(object_id, {}, task_id, address); + rc->AddOwnedObject(object_id, {}, task_id, address, "", 0); TaskID added_id; rpc::Address added_address; @@ -252,14 +282,14 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) { auto object_id2 = ObjectID::FromRandom(); task_id = TaskID::ForFakeTask(); address.set_ip_address("5678"); - rc->AddOwnedObject(object_id2, {}, task_id, address); + rc->AddOwnedObject(object_id2, {}, task_id, address, "", 0); ASSERT_TRUE(rc->GetOwner(object_id2, &added_id, &added_address)); ASSERT_EQ(task_id, added_id); ASSERT_EQ(address.ip_address(), added_address.ip_address()); auto object_id3 = ObjectID::FromRandom(); ASSERT_FALSE(rc->GetOwner(object_id3, &added_id, &added_address)); - rc->AddLocalReference(object_id3); + rc->AddLocalReference(object_id3, ""); ASSERT_FALSE(rc->GetOwner(object_id3, &added_id, &added_address)); } @@ -280,7 +310,7 @@ TEST(MemoryStoreIntegrationTest, TestSimple) { ASSERT_EQ(store.Size(), 0); // Tests ref counting overrides remove after get option. - rc->AddLocalReference(id1); + rc->AddLocalReference(id1, ""); RAY_CHECK_OK(store.Put(buffer, id1)); ASSERT_EQ(store.Size(), 1); std::vector> results; diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 7b8481326..0fc6792a7 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -25,11 +25,18 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( const std::string &store_socket, const std::shared_ptr raylet_client, std::function check_signals, bool evict_if_full, - std::function on_store_full) + std::function on_store_full, + std::function get_current_call_site) : raylet_client_(raylet_client), check_signals_(check_signals), evict_if_full_(evict_if_full), on_store_full_(on_store_full) { + if (get_current_call_site != nullptr) { + get_current_call_site_ = get_current_call_site; + } else { + get_current_call_site_ = []() { return ""; }; + } + buffer_tracker_ = std::make_shared(); RAY_ARROW_CHECK_OK(store_client_.Connect(store_socket)); } @@ -110,7 +117,10 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta } else { RAY_LOG(ERROR) << "Failed to put object " << object_id << " after " << (max_retries + 1) << " attempts. Plasma store status:\n" - << MemoryUsageString(); + << MemoryUsageString() << "\n---\n" + << "--- Tip: Use the `ray memory` command to list active objects " + "in the cluster." + << "\n---\n"; } } else if (plasma::IsPlasmaObjectExists(plasma_status)) { RAY_LOG(WARNING) << "Trying to put an object that already existed in plasma: " @@ -171,7 +181,21 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( std::shared_ptr data = nullptr; std::shared_ptr metadata = nullptr; if (plasma_results[i].data && plasma_results[i].data->size()) { - data = std::make_shared(plasma_results[i].data); + // We track the set of active data buffers in active_buffers_. On destruction, + // the buffer entry will be removed from the set via callback. + std::shared_ptr tracker = buffer_tracker_; + data = std::make_shared( + plasma_results[i].data, [tracker, object_id](PlasmaBuffer *this_buffer) { + absl::MutexLock lock(&tracker->active_buffers_mutex_); + auto key = std::make_pair(object_id, this_buffer); + RAY_CHECK(tracker->active_buffers_.contains(key)); + tracker->active_buffers_.erase(key); + }); + auto call_site = get_current_call_site_(); + { + absl::MutexLock lock(&tracker->active_buffers_mutex_); + tracker->active_buffers_[std::make_pair(object_id, data.get())] = call_site; + } } if (plasma_results[i].metadata && plasma_results[i].metadata->size()) { metadata = std::make_shared(plasma_results[i].metadata); @@ -352,6 +376,23 @@ std::string CoreWorkerPlasmaStoreProvider::MemoryUsageString() { return store_client_.DebugString(); } +absl::flat_hash_map> +CoreWorkerPlasmaStoreProvider::UsedObjectsList() const { + absl::flat_hash_map> used; + absl::MutexLock lock(&buffer_tracker_->active_buffers_mutex_); + for (const auto &entry : buffer_tracker_->active_buffers_) { + auto it = used.find(entry.first.first); + if (it != used.end()) { + // Prefer to keep entries that have non-empty callsites. + if (!it->second.second.empty()) { + continue; + } + } + used[entry.first.first] = std::make_pair(entry.first.second->Size(), entry.second); + } + return used; +} + void CoreWorkerPlasmaStoreProvider::WarnIfAttemptedTooManyTimes( int num_attempts, const absl::flat_hash_set &remaining) { if (num_attempts % RayConfig::instance().object_store_get_warn_per_num_attempts() == diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 74045065a..ede38c244 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -33,10 +33,12 @@ namespace ray { /// See `CoreWorkerStoreProvider` for the semantics of public methods. class CoreWorkerPlasmaStoreProvider { public: - CoreWorkerPlasmaStoreProvider(const std::string &store_socket, - const std::shared_ptr raylet_client, - std::function check_signals, bool evict_if_full, - std::function on_store_full = nullptr); + CoreWorkerPlasmaStoreProvider( + const std::string &store_socket, + const std::shared_ptr raylet_client, + std::function check_signals, bool evict_if_full, + std::function on_store_full = nullptr, + std::function get_current_call_site = nullptr); ~CoreWorkerPlasmaStoreProvider(); @@ -95,6 +97,11 @@ class CoreWorkerPlasmaStoreProvider { Status Delete(const absl::flat_hash_set &object_ids, bool local_only, bool delete_creating_tasks); + /// Lists objects in used (pinned) by the current client. + /// + /// \return Output mapping of used object ids to (size, callsite). + absl::flat_hash_map> UsedObjectsList() const; + std::string MemoryUsageString(); private: @@ -136,6 +143,24 @@ class CoreWorkerPlasmaStoreProvider { std::function check_signals_; const bool evict_if_full_; std::function on_store_full_; + std::function get_current_call_site_; + + // Active buffers tracker. This must be allocated as a separate structure since its + // lifetime can exceed that of the store provider due to callback references. + struct BufferTracker { + // Guards the active buffers map. This mutex may be acquired during PlasmaBuffer + // destruction. + mutable absl::Mutex active_buffers_mutex_; + // Mapping of live object buffers to their creation call site. Destroyed buffers are + // automatically removed from this list via destructor callback. The map key uniquely + // identifies a buffer. It should not be a shared ptr since that would keep the Buffer + // alive forever (i.e., this is a weak ref map). + absl::flat_hash_map, std::string> active_buffers_ + GUARDED_BY(active_buffers_mutex_); + }; + + // Pointer to the shared buffer tracker. + std::shared_ptr buffer_tracker_; }; } // namespace ray diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 40eca7787..1acbcacfa 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -26,7 +26,8 @@ const int64_t kTaskFailureLoggingFrequencyMillis = 5000; void TaskManager::AddPendingTask(const TaskID &caller_id, const rpc::Address &caller_address, - const TaskSpecification &spec, int max_retries) { + const TaskSpecification &spec, + const std::string &call_site, int max_retries) { RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId(); absl::MutexLock lock(&mu_); std::pair entry = {spec, max_retries}; @@ -67,7 +68,8 @@ void TaskManager::AddPendingTask(const TaskID &caller_id, // the inner IDs. Note that this RPC can be received *before* the // PushTaskReply. reference_counter_->AddOwnedObject(spec.ReturnId(i, TaskTransportType::DIRECT), - /*inner_ids=*/{}, caller_id, caller_address); + /*inner_ids=*/{}, caller_id, caller_address, + call_site, -1); } } @@ -107,6 +109,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, for (int i = 0; i < reply.return_objects_size(); i++) { const auto &return_object = reply.return_objects(i); ObjectID object_id = ObjectID::FromBinary(return_object.object_id()); + reference_counter_->UpdateObjectSize(object_id, return_object.size()); if (return_object.in_plasma()) { // Mark it as in plasma with a dummy object. diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index ce2fd0d16..cea06bc98 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -64,7 +64,8 @@ class TaskManager : public TaskFinisherInterface { /// on failure. /// \return Void. void AddPendingTask(const TaskID &caller_id, const rpc::Address &caller_address, - const TaskSpecification &spec, int max_retries = 0); + const TaskSpecification &spec, const std::string &call_site, + int max_retries = 0); /// Wait for all pending tasks to finish, and then shutdown. /// diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index 291f8143c..3345d4304 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -69,7 +69,7 @@ TEST_F(TaskManagerTest, TestTaskSuccess) { ObjectID dep2 = ObjectID::FromRandom(); auto spec = CreateTaskHelper(1, {dep1, dep2}); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); - manager_.AddPendingTask(caller_id, caller_address, spec); + manager_.AddPendingTask(caller_id, caller_address, spec, ""); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); @@ -95,7 +95,7 @@ TEST_F(TaskManagerTest, TestTaskSuccess) { ASSERT_EQ(num_retries_, 0); std::vector removed; - reference_counter_->AddLocalReference(return_id); + reference_counter_->AddLocalReference(return_id, ""); reference_counter_->RemoveLocalReference(return_id, &removed); ASSERT_EQ(removed[0], return_id); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); @@ -109,7 +109,7 @@ TEST_F(TaskManagerTest, TestTaskFailure) { ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); auto spec = CreateTaskHelper(1, {dep1, dep2}); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); - manager_.AddPendingTask(caller_id, caller_address, spec); + manager_.AddPendingTask(caller_id, caller_address, spec, ""); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); @@ -130,7 +130,7 @@ TEST_F(TaskManagerTest, TestTaskFailure) { ASSERT_EQ(num_retries_, 0); std::vector removed; - reference_counter_->AddLocalReference(return_id); + reference_counter_->AddLocalReference(return_id, ""); reference_counter_->RemoveLocalReference(return_id, &removed); ASSERT_EQ(removed[0], return_id); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); @@ -145,7 +145,7 @@ TEST_F(TaskManagerTest, TestTaskRetry) { auto spec = CreateTaskHelper(1, {dep1, dep2}); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, num_retries); + manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); @@ -174,7 +174,7 @@ TEST_F(TaskManagerTest, TestTaskRetry) { ASSERT_EQ(stored_error, error); std::vector removed; - reference_counter_->AddLocalReference(return_id); + reference_counter_->AddLocalReference(return_id, ""); reference_counter_->RemoveLocalReference(return_id, &removed); ASSERT_EQ(removed[0], return_id); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 1c62d7ab9..6f85992ff 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -264,6 +264,7 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( // The object is nullptr if it already existed in the object store. const auto &result = return_objects[i]; + return_object->set_size(result->GetSize()); if (result->GetData() != nullptr && result->GetData()->IsPlasmaBuffer()) { return_object->set_in_plasma(true); } else { diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 8dc4cde43..97797b6c8 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -224,6 +224,24 @@ message ViewData { repeated Measure measures = 2; } +// Debug info for a referenced object. +message ObjectRefInfo { + // Object id that is referenced. + bytes object_id = 1; + // Language call site of the object reference (i.e., file and line number). + string call_site = 2; + // Size of the object if this core worker is the owner, otherwise -1. + int64 object_size = 3; + // Number of local references to the object. + int64 local_ref_count = 4; + // Number of references in submitted tasks. + int64 submitted_task_ref_count = 5; + // Object ids that contain this object. + repeated bytes contained_in_owned = 6; + // True if this object is pinned in memory by the current process. + bool pinned_in_memory = 7; +} + // Debug info returned from the core worker. message CoreWorkerStats { // Debug string of the currently executing task. @@ -254,4 +272,6 @@ message CoreWorkerStats { int32 num_executed_tasks = 14; // Actor constructor. string actor_title = 15; + // Local reference table. + repeated ObjectRefInfo object_refs = 16; } diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index d418497b0..cec8754a3 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -85,6 +85,8 @@ message ReturnObject { bytes metadata = 4; // ObjectIDs that were nested in data. This is only set for inlined objects. repeated bytes nested_inlined_ids = 5; + // Size of this object. + int64 size = 6; } message PushTaskRequest { @@ -182,6 +184,9 @@ message KillActorReply { message GetCoreWorkerStatsRequest { // The ID of the worker this message is intended for. bytes intended_worker_id = 1; + // Whether to include memory stats. This could be large since it includes + // metadata for all live object references. + bool include_memory_info = 2; } message GetCoreWorkerStatsReply { diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index b31c22943..b05ee78ac 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -70,6 +70,9 @@ message PinObjectIDsReply { } message GetNodeStatsRequest { + // Whether to include memory stats. This could be large since it includes + // metadata for all live object references. + bool include_memory_info = 1; } message WorkerStats { @@ -79,6 +82,8 @@ message WorkerStats { bool is_driver = 2; // Debug information returned from the core worker. CoreWorkerStats core_worker_stats = 3; + // Error string if fetching core worker stats failed. + string fetch_error = 4; } message GetNodeStatsReply { @@ -95,6 +100,15 @@ message GlobalGCRequest { message GlobalGCReply { } +message FormatGlobalMemoryInfoRequest { +} + +message FormatGlobalMemoryInfoReply { + // A tabular summary of the memory stats. To get this data in structured form, you + // can instead use GetNodeStats() directly. + string memory_summary = 1; +} + // Service for inter-node-manager communication. service NodeManagerService { // Request a worker from the raylet. @@ -109,4 +123,7 @@ service NodeManagerService { rpc GetNodeStats(GetNodeStatsRequest) returns (GetNodeStatsReply); // Trigger garbage collection in all workers across the cluster. rpc GlobalGC(GlobalGCRequest) returns (GlobalGCReply); + // Get global object reference stats in formatted form. + rpc FormatGlobalMemoryInfo(FormatGlobalMemoryInfoRequest) + returns (FormatGlobalMemoryInfoReply); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 87f1051e7..31a0e10e0 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -3299,14 +3299,9 @@ void NodeManager::FlushObjectsToFree() { last_free_objects_at_ms_ = current_time_ms(); } -void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request, +void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request, rpc::GetNodeStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { - for (const auto &driver : worker_pool_.GetAllDrivers()) { - auto worker_stats = reply->add_workers_stats(); - worker_stats->set_pid(driver->GetProcess().GetId()); - worker_stats->set_is_driver(true); - } // NOTE(sang): Currently reporting only infeasible/ready ActorCreationTask // because Ray dashboard only renders actorCreationTask as of Feb 3 2020. // TODO(sang): Support dashboard for non-ActorCreationTask. @@ -3371,33 +3366,138 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request, // HandleGetNodeStats should set a timeout so that the rpc finishes even if not all // workers have replied. auto all_workers = worker_pool_.GetAllWorkers(); + absl::flat_hash_set driver_ids; + for (auto driver : worker_pool_.GetAllDrivers()) { + all_workers.push_back(driver); + driver_ids.insert(driver->WorkerId()); + } for (const auto &worker : all_workers) { rpc::GetCoreWorkerStatsRequest request; request.set_intended_worker_id(worker->WorkerId().Binary()); + request.set_include_memory_info(node_stats_request.include_memory_info()); auto status = worker->rpc_client()->GetCoreWorkerStats( - request, [reply, worker, all_workers, send_reply_callback]( + request, [reply, worker, all_workers, driver_ids, send_reply_callback]( const ray::Status &status, const rpc::GetCoreWorkerStatsReply &r) { - if (!status.ok()) { - RAY_LOG(WARNING) << "Failed to send get core worker stats request: " - << status.ToString(); - } else { - auto worker_stats = reply->add_workers_stats(); - worker_stats->set_pid(worker->GetProcess().GetId()); - worker_stats->set_is_driver(false); - reply->set_num_workers(reply->num_workers() + 1); + auto worker_stats = reply->add_workers_stats(); + worker_stats->set_pid(worker->GetProcess().GetId()); + worker_stats->set_is_driver(driver_ids.contains(worker->WorkerId())); + reply->set_num_workers(reply->num_workers() + 1); + if (status.ok()) { worker_stats->mutable_core_worker_stats()->MergeFrom(r.core_worker_stats()); - if (reply->num_workers() == all_workers.size()) { - send_reply_callback(Status::OK(), nullptr, nullptr); - } + } else { + RAY_LOG(ERROR) << "Failed to send get core worker stats request: " + << status.ToString(); + worker_stats->set_fetch_error(status.ToString()); + } + if (reply->num_workers() == all_workers.size()) { + send_reply_callback(Status::OK(), nullptr, nullptr); } }); if (!status.ok()) { - RAY_LOG(WARNING) << "Failed to send get core worker stats request: " - << status.ToString(); + RAY_LOG(ERROR) << "Failed to send get core worker stats request: " + << status.ToString(); } } } +std::string FormatMemoryInfo( + std::vector> node_stats) { + // First pass to compute object sizes. + absl::flat_hash_map object_sizes; + for (const auto &reply : node_stats) { + for (const auto &worker_stats : reply->workers_stats()) { + for (const auto &object_ref : worker_stats.core_worker_stats().object_refs()) { + auto obj_id = ObjectID::FromBinary(object_ref.object_id()); + if (object_ref.object_size() > 0) { + object_sizes[obj_id] = object_ref.object_size(); + } + } + } + } + + std::ostringstream builder; + builder + << "----------------------------------------------------------------------------" + "-------------------------\n"; + builder + << " Object ID Reference Type Object Size " + " Reference Creation Site\n"; + builder + << "============================================================================" + "=========================\n"; + + // Second pass builds the summary string for each node. + for (const auto &reply : node_stats) { + for (const auto &worker_stats : reply->workers_stats()) { + bool pid_printed = false; + for (const auto &object_ref : worker_stats.core_worker_stats().object_refs()) { + if (!object_ref.pinned_in_memory() && object_ref.local_ref_count() == 0 && + object_ref.submitted_task_ref_count() == 0 && + object_ref.contained_in_owned_size() == 0) { + continue; + } + if (!pid_printed) { + if (worker_stats.is_driver()) { + builder << "; driver pid=" << worker_stats.pid() << "\n"; + } else { + builder << "; worker pid=" << worker_stats.pid() << "\n"; + } + pid_printed = true; + } + auto obj_id = ObjectID::FromBinary(object_ref.object_id()); + builder << obj_id.Hex() << " "; + // TODO(ekl) we could convey more information about the reference status. + if (object_ref.pinned_in_memory()) { + builder << "PINNED_IN_MEMORY "; + } else if (object_ref.submitted_task_ref_count() > 0) { + builder << "USED_BY_PENDING_TASK "; + } else if (object_ref.local_ref_count() > 0) { + builder << "LOCAL_REFERENCE "; + } else if (object_ref.contained_in_owned_size() > 0) { + builder << "CAPTURED_IN_OBJECT "; + } else { + builder << "UNKNOWN_STATUS "; + } + builder << std::right << std::setfill(' ') << std::setw(11); + if (object_sizes.contains(obj_id)) { + builder << object_sizes[obj_id]; + } else { + builder << " ?"; + } + builder << " " << object_ref.call_site(); + builder << "\n"; + } + } + } + builder + << "----------------------------------------------------------------------------" + "-------------------------\n"; + + return builder.str(); +} + +void NodeManager::HandleFormatGlobalMemoryInfo( + const rpc::FormatGlobalMemoryInfoRequest &request, + rpc::FormatGlobalMemoryInfoReply *reply, rpc::SendReplyCallback send_reply_callback) { + std::vector> replies; + + auto local_request = std::make_shared(); + auto local_reply = std::make_shared(); + local_request->set_include_memory_info(true); + + // TODO(ekl): for (const auto &entry : remote_node_manager_clients_) {} + // to handle remote nodes + + HandleGetNodeStats(*local_request, local_reply.get(), + [local_request, local_reply, replies, reply, send_reply_callback]( + Status status, std::function success, + std::function failure) mutable { + replies.push_back(local_reply); + reply->set_memory_summary(FormatMemoryInfo(replies)); + send_reply_callback(Status::OK(), nullptr, nullptr); + }); +} + void NodeManager::HandleGlobalGC(const rpc::GlobalGCRequest &request, rpc::GlobalGCReply *reply, rpc::SendReplyCallback send_reply_callback) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index a01a81566..4ad52a777 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -596,6 +596,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler { void HandleGlobalGC(const rpc::GlobalGCRequest &request, rpc::GlobalGCReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `FormatGlobalMemoryInfo`` request. + void HandleFormatGlobalMemoryInfo(const rpc::FormatGlobalMemoryInfoRequest &request, + rpc::FormatGlobalMemoryInfoReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Trigger local GC on each worker of this raylet. void DoLocalGC(); diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index 068b9f6a8..3fd91fb44 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -30,7 +30,8 @@ namespace rpc { RPC_SERVICE_HANDLER(NodeManagerService, ForwardTask) \ RPC_SERVICE_HANDLER(NodeManagerService, PinObjectIDs) \ RPC_SERVICE_HANDLER(NodeManagerService, GetNodeStats) \ - RPC_SERVICE_HANDLER(NodeManagerService, GlobalGC) + RPC_SERVICE_HANDLER(NodeManagerService, GlobalGC) \ + RPC_SERVICE_HANDLER(NodeManagerService, FormatGlobalMemoryInfo) /// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`. class NodeManagerServiceHandler { @@ -68,6 +69,10 @@ class NodeManagerServiceHandler { virtual void HandleGlobalGC(const GlobalGCRequest &request, GlobalGCReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleFormatGlobalMemoryInfo(const FormatGlobalMemoryInfoRequest &request, + FormatGlobalMemoryInfoReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeManagerService`. diff --git a/src/ray/util/process.cc b/src/ray/util/process.cc index 8e0395aa5..2834ece33 100644 --- a/src/ray/util/process.cc +++ b/src/ray/util/process.cc @@ -294,7 +294,7 @@ void Process::Kill() { } #endif if (error) { - RAY_LOG(ERROR) << "Failed to kill processs " << pid << " with error " << error + RAY_LOG(ERROR) << "Failed to kill process " << pid << " with error " << error << ": " << error.message(); } } else {