mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 21:08:50 +08:00
Move profiling to c++ (#5771)
* Move profiling to c++ * comments * Fix tests * Start after constructor * fix comment * always init logging * Fix logging * fix logging issue * shared_ptr for profiler * DEBUG -> WARNING * fix killed_ init * Fix flaky checkpointing tests * Fix checkpoint test logic * Fix exception matching * timeout exception * Fix import * fix build * use boost::asio * fix double const * Properly reset async_wait * remove SIGINT * Change error message * increase timeout * small nits * Don't trap on SIGINT * -v for tune * Fix test
This commit is contained in:
+18
-50
@@ -30,6 +30,7 @@ from ray.includes.common cimport (
|
||||
CTaskArg,
|
||||
CRayFunction,
|
||||
LocalMemoryBuffer,
|
||||
move,
|
||||
LANGUAGE_CPP,
|
||||
LANGUAGE_JAVA,
|
||||
LANGUAGE_PYTHON,
|
||||
@@ -74,6 +75,7 @@ include "includes/ray_config.pxi"
|
||||
include "includes/task.pxi"
|
||||
include "includes/buffer.pxi"
|
||||
include "includes/common.pxi"
|
||||
include "includes/libcoreworker.pxi"
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -329,50 +331,6 @@ cdef class RayletClient:
|
||||
error_message.encode("ascii"),
|
||||
timestamp))
|
||||
|
||||
def push_profile_events(self, component_type, UniqueID component_id,
|
||||
node_ip_address, profile_data):
|
||||
cdef:
|
||||
GCSProfileTableData profile_info
|
||||
GCSProfileEvent *profile_event
|
||||
c_string event_type
|
||||
|
||||
if len(profile_data) == 0:
|
||||
return # Short circuit if there are no profile events.
|
||||
|
||||
profile_info.set_component_type(component_type.encode("ascii"))
|
||||
profile_info.set_component_id(component_id.binary())
|
||||
profile_info.set_node_ip_address(node_ip_address.encode("ascii"))
|
||||
|
||||
for py_profile_event in profile_data:
|
||||
profile_event = profile_info.add_profile_events()
|
||||
if not isinstance(py_profile_event, dict):
|
||||
raise TypeError(
|
||||
"Incorrect type for a profile event. Expected dict "
|
||||
"instead of '%s'" % str(type(py_profile_event)))
|
||||
# TODO(rkn): If the dictionary is formatted incorrectly, that
|
||||
# could lead to errors. E.g., if any of the strings are empty,
|
||||
# that will cause segfaults in the node manager.
|
||||
for key_string, event_data in py_profile_event.items():
|
||||
if key_string == "event_type":
|
||||
if len(event_data) == 0:
|
||||
raise ValueError(
|
||||
"'event_type' should not be a null string.")
|
||||
profile_event.set_event_type(event_data.encode("ascii"))
|
||||
elif key_string == "start_time":
|
||||
profile_event.set_start_time(float(event_data))
|
||||
elif key_string == "end_time":
|
||||
profile_event.set_end_time(float(event_data))
|
||||
elif key_string == "extra_data":
|
||||
if len(event_data) == 0:
|
||||
raise ValueError(
|
||||
"'extra_data' should not be a null string.")
|
||||
profile_event.set_extra_data(event_data.encode("ascii"))
|
||||
else:
|
||||
raise ValueError(
|
||||
"Unknown profile event key '%s'" % key_string)
|
||||
|
||||
check_status(self.client.PushProfileEvents(profile_info))
|
||||
|
||||
def prepare_actor_checkpoint(self, ActorID actor_id):
|
||||
cdef:
|
||||
CActorCheckpointID checkpoint_id
|
||||
@@ -408,16 +366,18 @@ cdef class CoreWorker:
|
||||
cdef unique_ptr[CCoreWorker] core_worker
|
||||
|
||||
def __cinit__(self, is_driver, store_socket, raylet_socket,
|
||||
JobID job_id, GcsClientOptions gcs_options, log_dir):
|
||||
JobID job_id, GcsClientOptions gcs_options, log_dir,
|
||||
node_ip_address):
|
||||
assert pyarrow is not None, ("Expected pyarrow to be imported from "
|
||||
"outside _raylet. See __init__.py for "
|
||||
"details.")
|
||||
|
||||
self.core_worker.reset(new CCoreWorker(
|
||||
WORKER_TYPE_DRIVER if is_driver else WORKER_TYPE_WORKER,
|
||||
LANGUAGE_PYTHON, store_socket.encode("ascii"),
|
||||
raylet_socket.encode("ascii"), job_id.native(),
|
||||
gcs_options.native()[0], log_dir.encode("utf-8"), NULL, False))
|
||||
|
||||
assert pyarrow is not None, ("Expected pyarrow to be imported from "
|
||||
"outside _raylet. See __init__.py for "
|
||||
"details.")
|
||||
gcs_options.native()[0], log_dir.encode("utf-8"),
|
||||
node_ip_address.encode("utf-8"), NULL, False))
|
||||
|
||||
def disconnect(self):
|
||||
with nogil:
|
||||
@@ -620,3 +580,11 @@ cdef class CoreWorker:
|
||||
message = self.core_worker.get().Objects().MemoryUsageString()
|
||||
|
||||
return message.decode("utf-8")
|
||||
|
||||
def profile_event(self, event_type, dict extra_data):
|
||||
cdef:
|
||||
c_string c_event_type = event_type.encode("ascii")
|
||||
|
||||
return ProfileEvent.make(
|
||||
self.core_worker.get().CreateProfileEvent(c_event_type),
|
||||
extra_data)
|
||||
|
||||
@@ -16,6 +16,25 @@ from ray.includes.unique_ids cimport (
|
||||
)
|
||||
|
||||
|
||||
cdef extern from * namespace "polyfill":
|
||||
"""
|
||||
namespace polyfill {
|
||||
|
||||
template <typename T>
|
||||
inline typename std::remove_reference<T>::type&& move(T& t) {
|
||||
return std::move(t);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
inline typename std::remove_reference<T>::type&& move(T&& t) {
|
||||
return std::move(t);
|
||||
}
|
||||
|
||||
} // namespace polyfill
|
||||
"""
|
||||
cdef T move[T](T)
|
||||
|
||||
|
||||
cdef extern from "ray/common/status.h" namespace "ray" nogil:
|
||||
cdef cppclass StatusCode:
|
||||
pass
|
||||
|
||||
@@ -25,6 +25,10 @@ from ray.includes.common cimport (
|
||||
from ray.includes.libraylet cimport CRayletClient
|
||||
|
||||
|
||||
cdef extern from "ray/core_worker/profiling.h" nogil:
|
||||
cdef cppclass CProfileEvent "ray::worker::ProfileEvent":
|
||||
void SetExtraData(const c_string &extra_data)
|
||||
|
||||
cdef extern from "ray/core_worker/task_interface.h" namespace "ray" nogil:
|
||||
cdef cppclass CTaskSubmissionInterface "CoreWorkerTaskInterface":
|
||||
CRayStatus SubmitTask(
|
||||
@@ -63,7 +67,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
||||
const c_string &store_socket,
|
||||
const c_string &raylet_socket, const CJobID &job_id,
|
||||
const CGcsClientOptions &gcs_options,
|
||||
const c_string log_dir, void* execution_callback,
|
||||
const c_string &log_dir, const c_string &node_ip_address,
|
||||
void* execution_callback,
|
||||
c_bool use_memory_store_)
|
||||
void Disconnect()
|
||||
CWorkerType &GetWorkerType()
|
||||
@@ -71,6 +76,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
||||
CObjectInterface &Objects()
|
||||
CTaskSubmissionInterface &Tasks()
|
||||
# CTaskExecutionInterface &Execution()
|
||||
unique_ptr[CProfileEvent] CreateProfileEvent(
|
||||
const c_string &event_type)
|
||||
|
||||
# TODO(edoakes): remove this once the raylet client is no longer used
|
||||
# directly.
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
from libcpp.string cimport string as c_string
|
||||
|
||||
from ray.includes.libcoreworker cimport CProfileEvent
|
||||
|
||||
import json
|
||||
import traceback
|
||||
|
||||
cdef class ProfileEvent:
|
||||
"""Cython wrapper class of C++ `ray::worker::ProfileEvent`."""
|
||||
cdef:
|
||||
unique_ptr[CProfileEvent] inner
|
||||
dict extra_data
|
||||
|
||||
@staticmethod
|
||||
cdef make(unique_ptr[CProfileEvent] event, dict extra_data):
|
||||
cdef ProfileEvent self = ProfileEvent.__new__(ProfileEvent)
|
||||
self.inner = move(event)
|
||||
self.extra_data = extra_data
|
||||
return self
|
||||
|
||||
def set_extra_data(self, c_string extra_data):
|
||||
self.inner.get().SetExtraData(extra_data)
|
||||
|
||||
def __enter__(self):
|
||||
pass
|
||||
|
||||
def __exit__(self, type, value, tb):
|
||||
extra_data = {}
|
||||
if type is not None:
|
||||
extra_data = {
|
||||
"type": str(type),
|
||||
"value": str(value),
|
||||
"traceback": str(traceback.format_exc()),
|
||||
}
|
||||
elif self.extra_data is not None:
|
||||
extra_data = self.extra_data
|
||||
|
||||
self.inner.get().SetExtraData(json.dumps(extra_data).encode("ascii"))
|
||||
|
||||
# Deleting the CProfileEvent will add it to a queue to be pushed to
|
||||
# the driver.
|
||||
self.inner.reset()
|
||||
+3
-146
@@ -2,17 +2,8 @@ from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import json
|
||||
import time
|
||||
import threading
|
||||
import traceback
|
||||
|
||||
import ray
|
||||
|
||||
LOG_POINT = 0
|
||||
LOG_SPAN_START = 1
|
||||
LOG_SPAN_END = 2
|
||||
|
||||
|
||||
class _NullLogSpan(object):
|
||||
"""A log span context manager that does nothing"""
|
||||
@@ -58,140 +49,6 @@ def profile(event_type, extra_data=None):
|
||||
An object that can profile a span of time via a "with" statement.
|
||||
"""
|
||||
worker = ray.worker.global_worker
|
||||
return RayLogSpanRaylet(worker.profiler, event_type, extra_data=extra_data)
|
||||
|
||||
|
||||
class Profiler(object):
|
||||
"""A class that holds the profiling states.
|
||||
|
||||
Attributes:
|
||||
worker: the worker to profile.
|
||||
events: the buffer of events.
|
||||
lock: the lock to protect access of events.
|
||||
threads_stopped (threading.Event): A threading event used to signal to
|
||||
the thread that it should exit.
|
||||
"""
|
||||
|
||||
def __init__(self, worker, threads_stopped):
|
||||
self.worker = worker
|
||||
self.events = []
|
||||
self.lock = threading.Lock()
|
||||
self.threads_stopped = threads_stopped
|
||||
|
||||
def start_flush_thread(self):
|
||||
self.t = threading.Thread(
|
||||
target=self._periodically_flush_profile_events,
|
||||
name="ray_push_profiling_information")
|
||||
# Making the thread a daemon causes it to exit when the main thread
|
||||
# exits.
|
||||
self.t.daemon = True
|
||||
self.t.start()
|
||||
|
||||
def join_flush_thread(self):
|
||||
"""Wait for the flush thread to exit."""
|
||||
self.t.join()
|
||||
|
||||
def _periodically_flush_profile_events(self):
|
||||
"""Drivers run this as a thread to flush profile data in the
|
||||
background."""
|
||||
# Note(rkn): This is run on a background thread in the driver. It uses
|
||||
# the raylet client. This should be ok because it doesn't read
|
||||
# from the raylet client and we have the GIL here. However,
|
||||
# if either of those things changes, then we could run into issues.
|
||||
while True:
|
||||
# Sleep for 1 second. This will be interrupted if
|
||||
# self.threads_stopped is set.
|
||||
self.threads_stopped.wait(timeout=1)
|
||||
|
||||
# Exit if we received a signal that we should stop.
|
||||
if self.threads_stopped.is_set():
|
||||
return
|
||||
|
||||
self.flush_profile_data()
|
||||
|
||||
def flush_profile_data(self):
|
||||
"""Push the logged profiling data to the global control store."""
|
||||
with self.lock:
|
||||
events = self.events
|
||||
self.events = []
|
||||
|
||||
if self.worker.mode == ray.WORKER_MODE:
|
||||
component_type = "worker"
|
||||
else:
|
||||
component_type = "driver"
|
||||
|
||||
self.worker.raylet_client.push_profile_events(
|
||||
component_type, ray.UniqueID(self.worker.worker_id),
|
||||
self.worker.node_ip_address, events)
|
||||
|
||||
def add_event(self, event):
|
||||
with self.lock:
|
||||
self.events.append(event)
|
||||
|
||||
|
||||
class RayLogSpanRaylet(object):
|
||||
"""An object used to enable logging a span of events with a with statement.
|
||||
|
||||
Attributes:
|
||||
event_type (str): The type of the event being logged.
|
||||
extra_data: Additional information to log.
|
||||
"""
|
||||
|
||||
def __init__(self, profiler, event_type, extra_data=None):
|
||||
"""Initialize a RayLogSpanRaylet object."""
|
||||
self.profiler = profiler
|
||||
self.event_type = event_type
|
||||
self.extra_data = extra_data if extra_data is not None else {}
|
||||
|
||||
def set_attribute(self, key, value):
|
||||
"""Add a key-value pair to the extra_data dict.
|
||||
|
||||
This can be used to add attributes that are not available when
|
||||
ray.profile was called.
|
||||
|
||||
Args:
|
||||
key: The attribute name.
|
||||
value: The attribute value.
|
||||
"""
|
||||
if not isinstance(key, str) or not isinstance(value, str):
|
||||
raise ValueError("The arguments 'key' and 'value' must both be "
|
||||
"strings. Instead they are {} and {}.".format(
|
||||
key, value))
|
||||
self.extra_data[key] = value
|
||||
|
||||
def __enter__(self):
|
||||
"""Log the beginning of a span event.
|
||||
|
||||
Returns:
|
||||
The object itself is returned so that if the block is opened using
|
||||
"with ray.profile(...) as prof:", we can call
|
||||
"prof.set_attribute" inside the block.
|
||||
"""
|
||||
self.start_time = time.time()
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, tb):
|
||||
"""Log the end of a span event. Log any exception that occurred."""
|
||||
for key, value in self.extra_data.items():
|
||||
if not isinstance(key, str) or not isinstance(value, str):
|
||||
raise ValueError("The extra_data argument must be a "
|
||||
"dictionary mapping strings to strings. "
|
||||
"Instead it is {}.".format(self.extra_data))
|
||||
|
||||
if type is not None:
|
||||
extra_data = json.dumps({
|
||||
"type": str(type),
|
||||
"value": str(value),
|
||||
"traceback": str(traceback.format_exc()),
|
||||
})
|
||||
else:
|
||||
extra_data = json.dumps(self.extra_data)
|
||||
|
||||
event = {
|
||||
"event_type": self.event_type,
|
||||
"start_time": self.start_time,
|
||||
"end_time": time.time(),
|
||||
"extra_data": extra_data,
|
||||
}
|
||||
|
||||
self.profiler.add_event(event)
|
||||
if worker.mode == ray.worker.LOCAL_MODE:
|
||||
return NULL_LOG_SPAN
|
||||
return worker.core_worker.profile_event(event_type, extra_data)
|
||||
|
||||
@@ -2660,14 +2660,14 @@ def test_init_exception_in_checkpointable_actor(ray_start_regular,
|
||||
a = CheckpointableFailedActor.remote()
|
||||
|
||||
# Make sure that we get errors from a failed constructor.
|
||||
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1, timeout=2)
|
||||
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1)
|
||||
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
|
||||
assert len(errors) == 1
|
||||
assert error_message1 in errors[0]["message"]
|
||||
|
||||
# Make sure that we get errors from a failed method.
|
||||
a.fail_method.remote()
|
||||
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2, timeout=2)
|
||||
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2)
|
||||
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
|
||||
assert len(errors) == 2
|
||||
assert error_message1 in errors[1]["message"]
|
||||
|
||||
@@ -1205,10 +1205,8 @@ def test_running_function_on_all_workers(ray_start_regular):
|
||||
def test_profiling_api(ray_start_2_cpus):
|
||||
@ray.remote
|
||||
def f():
|
||||
with ray.profile(
|
||||
"custom_event",
|
||||
extra_data={"name": "custom name"}) as ray_prof:
|
||||
ray_prof.set_attribute("key", "value")
|
||||
with ray.profile("custom_event", extra_data={"name": "custom name"}):
|
||||
pass
|
||||
|
||||
ray.put(1)
|
||||
object_id = f.remote()
|
||||
@@ -1220,10 +1218,6 @@ def test_profiling_api(ray_start_2_cpus):
|
||||
timeout_seconds = 20
|
||||
start_time = time.time()
|
||||
while True:
|
||||
if time.time() - start_time > timeout_seconds:
|
||||
raise RayTestTimeoutException(
|
||||
"Timed out while waiting for information in "
|
||||
"profile table.")
|
||||
profile_data = ray.timeline()
|
||||
event_types = {event["cat"] for event in profile_data}
|
||||
expected_types = [
|
||||
@@ -1246,6 +1240,15 @@ def test_profiling_api(ray_start_2_cpus):
|
||||
for expected_type in expected_types):
|
||||
break
|
||||
|
||||
if time.time() - start_time > timeout_seconds:
|
||||
raise RayTestTimeoutException(
|
||||
"Timed out while waiting for information in "
|
||||
"profile table. Missing events: {}.".format(
|
||||
set(expected_types) - set(event_types)))
|
||||
|
||||
# The profiling information only flushes once every second.
|
||||
time.sleep(1.1)
|
||||
|
||||
|
||||
def test_wait_cluster(ray_start_cluster):
|
||||
cluster = ray_start_cluster
|
||||
|
||||
+1
-11
@@ -126,7 +126,6 @@ class Worker(object):
|
||||
WORKER_MODE.
|
||||
cached_functions_to_run (List): A list of functions to run on all of
|
||||
the workers that should be exported as soon as connect is called.
|
||||
profiler: the profiler used to aggregate profiling information.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
@@ -146,7 +145,6 @@ class Worker(object):
|
||||
# When the worker is constructed. Record the original value of the
|
||||
# CUDA_VISIBLE_DEVICES environment variable.
|
||||
self.original_gpu_ids = ray.utils.get_cuda_visible_devices()
|
||||
self.profiler = None
|
||||
self.memory_monitor = memory_monitor.MemoryMonitor()
|
||||
# A dictionary that maps from driver id to SerializationContext
|
||||
# TODO: clean up the SerializationContext once the job finished.
|
||||
@@ -1832,8 +1830,6 @@ def connect(node,
|
||||
if not faulthandler.is_enabled():
|
||||
faulthandler.enable(all_threads=False)
|
||||
|
||||
worker.profiler = profiling.Profiler(worker, worker.threads_stopped)
|
||||
|
||||
if mode is not LOCAL_MODE:
|
||||
# Create a Redis client to primary.
|
||||
# The Redis client can safely be shared between threads. However,
|
||||
@@ -1973,6 +1969,7 @@ def connect(node,
|
||||
worker.current_job_id,
|
||||
gcs_options,
|
||||
node.get_logs_dir_path(),
|
||||
node.node_ip_address,
|
||||
)
|
||||
worker.task_context.current_task_id = (
|
||||
worker.core_worker.get_current_task_id())
|
||||
@@ -2022,11 +2019,6 @@ def connect(node,
|
||||
worker.logger_thread.daemon = True
|
||||
worker.logger_thread.start()
|
||||
|
||||
# If we are using the raylet code path and we are not in local mode, start
|
||||
# a background thread to periodically flush profiling data to the GCS.
|
||||
if mode != LOCAL_MODE:
|
||||
worker.profiler.start_flush_thread()
|
||||
|
||||
if mode == SCRIPT_MODE:
|
||||
# Add the directory containing the script that is running to the Python
|
||||
# paths of the workers. Also add the current directory. Note that this
|
||||
@@ -2069,8 +2061,6 @@ def disconnect():
|
||||
worker.threads_stopped.set()
|
||||
if hasattr(worker, "import_thread"):
|
||||
worker.import_thread.join_import_thread()
|
||||
if hasattr(worker, "profiler") and hasattr(worker.profiler, "t"):
|
||||
worker.profiler.join_flush_thread()
|
||||
if hasattr(worker, "listener_thread"):
|
||||
worker.listener_thread.join()
|
||||
if hasattr(worker, "printer_thread"):
|
||||
|
||||
Reference in New Issue
Block a user