diff --git a/.travis.yml b/.travis.yml index ab1562bd0..0da63f9e7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -170,7 +170,7 @@ script: # ray tune tests - if [ $RAY_CI_TUNE_AFFECTED == "1" ]; then ./ci/suppress_output python python/ray/tune/tests/test_dependency.py; fi # `cluster_tests.py` runs on Jenkins, not Travis. - - if [ $RAY_CI_TUNE_AFFECTED == "1" ]; then python -m pytest --durations=10 --timeout=300 --ignore=python/ray/tune/tests/test_cluster.py --ignore=python/ray/tune/tests/test_tune_restore.py --ignore=python/ray/tune/tests/test_actor_reuse.py python/ray/tune/tests; fi + - if [ $RAY_CI_TUNE_AFFECTED == "1" ]; then python -m pytest -v --durations=10 --timeout=300 --ignore=python/ray/tune/tests/test_cluster.py --ignore=python/ray/tune/tests/test_tune_restore.py --ignore=python/ray/tune/tests/test_actor_reuse.py python/ray/tune/tests; fi # ray tests # Python3.5+ only. Otherwise we will get `SyntaxError` regardless of how we set the tester. diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index fc992de3a..f55a207dd 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -30,6 +30,7 @@ from ray.includes.common cimport ( CTaskArg, CRayFunction, LocalMemoryBuffer, + move, LANGUAGE_CPP, LANGUAGE_JAVA, LANGUAGE_PYTHON, @@ -74,6 +75,7 @@ include "includes/ray_config.pxi" include "includes/task.pxi" include "includes/buffer.pxi" include "includes/common.pxi" +include "includes/libcoreworker.pxi" logger = logging.getLogger(__name__) @@ -329,50 +331,6 @@ cdef class RayletClient: error_message.encode("ascii"), timestamp)) - def push_profile_events(self, component_type, UniqueID component_id, - node_ip_address, profile_data): - cdef: - GCSProfileTableData profile_info - GCSProfileEvent *profile_event - c_string event_type - - if len(profile_data) == 0: - return # Short circuit if there are no profile events. - - profile_info.set_component_type(component_type.encode("ascii")) - profile_info.set_component_id(component_id.binary()) - profile_info.set_node_ip_address(node_ip_address.encode("ascii")) - - for py_profile_event in profile_data: - profile_event = profile_info.add_profile_events() - if not isinstance(py_profile_event, dict): - raise TypeError( - "Incorrect type for a profile event. Expected dict " - "instead of '%s'" % str(type(py_profile_event))) - # TODO(rkn): If the dictionary is formatted incorrectly, that - # could lead to errors. E.g., if any of the strings are empty, - # that will cause segfaults in the node manager. - for key_string, event_data in py_profile_event.items(): - if key_string == "event_type": - if len(event_data) == 0: - raise ValueError( - "'event_type' should not be a null string.") - profile_event.set_event_type(event_data.encode("ascii")) - elif key_string == "start_time": - profile_event.set_start_time(float(event_data)) - elif key_string == "end_time": - profile_event.set_end_time(float(event_data)) - elif key_string == "extra_data": - if len(event_data) == 0: - raise ValueError( - "'extra_data' should not be a null string.") - profile_event.set_extra_data(event_data.encode("ascii")) - else: - raise ValueError( - "Unknown profile event key '%s'" % key_string) - - check_status(self.client.PushProfileEvents(profile_info)) - def prepare_actor_checkpoint(self, ActorID actor_id): cdef: CActorCheckpointID checkpoint_id @@ -408,16 +366,18 @@ cdef class CoreWorker: cdef unique_ptr[CCoreWorker] core_worker def __cinit__(self, is_driver, store_socket, raylet_socket, - JobID job_id, GcsClientOptions gcs_options, log_dir): + JobID job_id, GcsClientOptions gcs_options, log_dir, + node_ip_address): + assert pyarrow is not None, ("Expected pyarrow to be imported from " + "outside _raylet. See __init__.py for " + "details.") + self.core_worker.reset(new CCoreWorker( WORKER_TYPE_DRIVER if is_driver else WORKER_TYPE_WORKER, LANGUAGE_PYTHON, store_socket.encode("ascii"), raylet_socket.encode("ascii"), job_id.native(), - gcs_options.native()[0], log_dir.encode("utf-8"), NULL, False)) - - assert pyarrow is not None, ("Expected pyarrow to be imported from " - "outside _raylet. See __init__.py for " - "details.") + gcs_options.native()[0], log_dir.encode("utf-8"), + node_ip_address.encode("utf-8"), NULL, False)) def disconnect(self): with nogil: @@ -620,3 +580,11 @@ cdef class CoreWorker: message = self.core_worker.get().Objects().MemoryUsageString() return message.decode("utf-8") + + def profile_event(self, event_type, dict extra_data): + cdef: + c_string c_event_type = event_type.encode("ascii") + + return ProfileEvent.make( + self.core_worker.get().CreateProfileEvent(c_event_type), + extra_data) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index beb716ac2..7050b8be8 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -16,6 +16,25 @@ from ray.includes.unique_ids cimport ( ) +cdef extern from * namespace "polyfill": + """ + namespace polyfill { + + template + inline typename std::remove_reference::type&& move(T& t) { + return std::move(t); + } + + template + inline typename std::remove_reference::type&& move(T&& t) { + return std::move(t); + } + + } // namespace polyfill + """ + cdef T move[T](T) + + cdef extern from "ray/common/status.h" namespace "ray" nogil: cdef cppclass StatusCode: pass diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index f45af2bbf..0d7766db3 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -25,6 +25,10 @@ from ray.includes.common cimport ( from ray.includes.libraylet cimport CRayletClient +cdef extern from "ray/core_worker/profiling.h" nogil: + cdef cppclass CProfileEvent "ray::worker::ProfileEvent": + void SetExtraData(const c_string &extra_data) + cdef extern from "ray/core_worker/task_interface.h" namespace "ray" nogil: cdef cppclass CTaskSubmissionInterface "CoreWorkerTaskInterface": CRayStatus SubmitTask( @@ -63,7 +67,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_string &store_socket, const c_string &raylet_socket, const CJobID &job_id, const CGcsClientOptions &gcs_options, - const c_string log_dir, void* execution_callback, + const c_string &log_dir, const c_string &node_ip_address, + void* execution_callback, c_bool use_memory_store_) void Disconnect() CWorkerType &GetWorkerType() @@ -71,6 +76,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CObjectInterface &Objects() CTaskSubmissionInterface &Tasks() # CTaskExecutionInterface &Execution() + unique_ptr[CProfileEvent] CreateProfileEvent( + const c_string &event_type) # TODO(edoakes): remove this once the raylet client is no longer used # directly. diff --git a/python/ray/includes/libcoreworker.pxi b/python/ray/includes/libcoreworker.pxi new file mode 100644 index 000000000..3060a2788 --- /dev/null +++ b/python/ray/includes/libcoreworker.pxi @@ -0,0 +1,42 @@ +from libcpp.string cimport string as c_string + +from ray.includes.libcoreworker cimport CProfileEvent + +import json +import traceback + +cdef class ProfileEvent: + """Cython wrapper class of C++ `ray::worker::ProfileEvent`.""" + cdef: + unique_ptr[CProfileEvent] inner + dict extra_data + + @staticmethod + cdef make(unique_ptr[CProfileEvent] event, dict extra_data): + cdef ProfileEvent self = ProfileEvent.__new__(ProfileEvent) + self.inner = move(event) + self.extra_data = extra_data + return self + + def set_extra_data(self, c_string extra_data): + self.inner.get().SetExtraData(extra_data) + + def __enter__(self): + pass + + def __exit__(self, type, value, tb): + extra_data = {} + if type is not None: + extra_data = { + "type": str(type), + "value": str(value), + "traceback": str(traceback.format_exc()), + } + elif self.extra_data is not None: + extra_data = self.extra_data + + self.inner.get().SetExtraData(json.dumps(extra_data).encode("ascii")) + + # Deleting the CProfileEvent will add it to a queue to be pushed to + # the driver. + self.inner.reset() diff --git a/python/ray/profiling.py b/python/ray/profiling.py index 4ace00954..65a9c3b17 100644 --- a/python/ray/profiling.py +++ b/python/ray/profiling.py @@ -2,17 +2,8 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import json -import time -import threading -import traceback - import ray -LOG_POINT = 0 -LOG_SPAN_START = 1 -LOG_SPAN_END = 2 - class _NullLogSpan(object): """A log span context manager that does nothing""" @@ -58,140 +49,6 @@ def profile(event_type, extra_data=None): An object that can profile a span of time via a "with" statement. """ worker = ray.worker.global_worker - return RayLogSpanRaylet(worker.profiler, event_type, extra_data=extra_data) - - -class Profiler(object): - """A class that holds the profiling states. - - Attributes: - worker: the worker to profile. - events: the buffer of events. - lock: the lock to protect access of events. - threads_stopped (threading.Event): A threading event used to signal to - the thread that it should exit. - """ - - def __init__(self, worker, threads_stopped): - self.worker = worker - self.events = [] - self.lock = threading.Lock() - self.threads_stopped = threads_stopped - - def start_flush_thread(self): - self.t = threading.Thread( - target=self._periodically_flush_profile_events, - name="ray_push_profiling_information") - # Making the thread a daemon causes it to exit when the main thread - # exits. - self.t.daemon = True - self.t.start() - - def join_flush_thread(self): - """Wait for the flush thread to exit.""" - self.t.join() - - def _periodically_flush_profile_events(self): - """Drivers run this as a thread to flush profile data in the - background.""" - # Note(rkn): This is run on a background thread in the driver. It uses - # the raylet client. This should be ok because it doesn't read - # from the raylet client and we have the GIL here. However, - # if either of those things changes, then we could run into issues. - while True: - # Sleep for 1 second. This will be interrupted if - # self.threads_stopped is set. - self.threads_stopped.wait(timeout=1) - - # Exit if we received a signal that we should stop. - if self.threads_stopped.is_set(): - return - - self.flush_profile_data() - - def flush_profile_data(self): - """Push the logged profiling data to the global control store.""" - with self.lock: - events = self.events - self.events = [] - - if self.worker.mode == ray.WORKER_MODE: - component_type = "worker" - else: - component_type = "driver" - - self.worker.raylet_client.push_profile_events( - component_type, ray.UniqueID(self.worker.worker_id), - self.worker.node_ip_address, events) - - def add_event(self, event): - with self.lock: - self.events.append(event) - - -class RayLogSpanRaylet(object): - """An object used to enable logging a span of events with a with statement. - - Attributes: - event_type (str): The type of the event being logged. - extra_data: Additional information to log. - """ - - def __init__(self, profiler, event_type, extra_data=None): - """Initialize a RayLogSpanRaylet object.""" - self.profiler = profiler - self.event_type = event_type - self.extra_data = extra_data if extra_data is not None else {} - - def set_attribute(self, key, value): - """Add a key-value pair to the extra_data dict. - - This can be used to add attributes that are not available when - ray.profile was called. - - Args: - key: The attribute name. - value: The attribute value. - """ - if not isinstance(key, str) or not isinstance(value, str): - raise ValueError("The arguments 'key' and 'value' must both be " - "strings. Instead they are {} and {}.".format( - key, value)) - self.extra_data[key] = value - - def __enter__(self): - """Log the beginning of a span event. - - Returns: - The object itself is returned so that if the block is opened using - "with ray.profile(...) as prof:", we can call - "prof.set_attribute" inside the block. - """ - self.start_time = time.time() - return self - - def __exit__(self, type, value, tb): - """Log the end of a span event. Log any exception that occurred.""" - for key, value in self.extra_data.items(): - if not isinstance(key, str) or not isinstance(value, str): - raise ValueError("The extra_data argument must be a " - "dictionary mapping strings to strings. " - "Instead it is {}.".format(self.extra_data)) - - if type is not None: - extra_data = json.dumps({ - "type": str(type), - "value": str(value), - "traceback": str(traceback.format_exc()), - }) - else: - extra_data = json.dumps(self.extra_data) - - event = { - "event_type": self.event_type, - "start_time": self.start_time, - "end_time": time.time(), - "extra_data": extra_data, - } - - self.profiler.add_event(event) + if worker.mode == ray.worker.LOCAL_MODE: + return NULL_LOG_SPAN + return worker.core_worker.profile_event(event_type, extra_data) diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 20fabd48d..2623bc7af 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -2660,14 +2660,14 @@ def test_init_exception_in_checkpointable_actor(ray_start_regular, a = CheckpointableFailedActor.remote() # Make sure that we get errors from a failed constructor. - wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1, timeout=2) + wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1) errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) assert len(errors) == 1 assert error_message1 in errors[0]["message"] # Make sure that we get errors from a failed method. a.fail_method.remote() - wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2, timeout=2) + wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2) errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) assert len(errors) == 2 assert error_message1 in errors[1]["message"] diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index c8846bfc5..ed6fdebe5 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1205,10 +1205,8 @@ def test_running_function_on_all_workers(ray_start_regular): def test_profiling_api(ray_start_2_cpus): @ray.remote def f(): - with ray.profile( - "custom_event", - extra_data={"name": "custom name"}) as ray_prof: - ray_prof.set_attribute("key", "value") + with ray.profile("custom_event", extra_data={"name": "custom name"}): + pass ray.put(1) object_id = f.remote() @@ -1220,10 +1218,6 @@ def test_profiling_api(ray_start_2_cpus): timeout_seconds = 20 start_time = time.time() while True: - if time.time() - start_time > timeout_seconds: - raise RayTestTimeoutException( - "Timed out while waiting for information in " - "profile table.") profile_data = ray.timeline() event_types = {event["cat"] for event in profile_data} expected_types = [ @@ -1246,6 +1240,15 @@ def test_profiling_api(ray_start_2_cpus): for expected_type in expected_types): break + if time.time() - start_time > timeout_seconds: + raise RayTestTimeoutException( + "Timed out while waiting for information in " + "profile table. Missing events: {}.".format( + set(expected_types) - set(event_types))) + + # The profiling information only flushes once every second. + time.sleep(1.1) + def test_wait_cluster(ray_start_cluster): cluster = ray_start_cluster diff --git a/python/ray/worker.py b/python/ray/worker.py index bf74ebd42..2cbd18825 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -126,7 +126,6 @@ class Worker(object): WORKER_MODE. cached_functions_to_run (List): A list of functions to run on all of the workers that should be exported as soon as connect is called. - profiler: the profiler used to aggregate profiling information. """ def __init__(self): @@ -146,7 +145,6 @@ class Worker(object): # When the worker is constructed. Record the original value of the # CUDA_VISIBLE_DEVICES environment variable. self.original_gpu_ids = ray.utils.get_cuda_visible_devices() - self.profiler = None self.memory_monitor = memory_monitor.MemoryMonitor() # A dictionary that maps from driver id to SerializationContext # TODO: clean up the SerializationContext once the job finished. @@ -1832,8 +1830,6 @@ def connect(node, if not faulthandler.is_enabled(): faulthandler.enable(all_threads=False) - worker.profiler = profiling.Profiler(worker, worker.threads_stopped) - if mode is not LOCAL_MODE: # Create a Redis client to primary. # The Redis client can safely be shared between threads. However, @@ -1973,6 +1969,7 @@ def connect(node, worker.current_job_id, gcs_options, node.get_logs_dir_path(), + node.node_ip_address, ) worker.task_context.current_task_id = ( worker.core_worker.get_current_task_id()) @@ -2022,11 +2019,6 @@ def connect(node, worker.logger_thread.daemon = True worker.logger_thread.start() - # If we are using the raylet code path and we are not in local mode, start - # a background thread to periodically flush profiling data to the GCS. - if mode != LOCAL_MODE: - worker.profiler.start_flush_thread() - if mode == SCRIPT_MODE: # Add the directory containing the script that is running to the Python # paths of the workers. Also add the current directory. Note that this @@ -2069,8 +2061,6 @@ def disconnect(): worker.threads_stopped.set() if hasattr(worker, "import_thread"): worker.import_thread.join_import_thread() - if hasattr(worker, "profiler") and hasattr(worker.profiler, "t"): - worker.profiler.join_flush_thread() if hasattr(worker, "listener_thread"): worker.listener_thread.join() if hasattr(worker, "printer_thread"): diff --git a/src/ray/core_worker/common.cc b/src/ray/core_worker/common.cc new file mode 100644 index 000000000..9f389b9b7 --- /dev/null +++ b/src/ray/core_worker/common.cc @@ -0,0 +1,25 @@ +#include "ray/core_worker/common.h" + +namespace ray { + +std::string WorkerTypeString(WorkerType type) { + if (type == WorkerType::DRIVER) { + return "driver"; + } else if (type == WorkerType::WORKER) { + return "worker"; + } + RAY_CHECK(false); + return ""; +} + +std::string LanguageString(Language language) { + if (language == Language::PYTHON) { + return "python"; + } else if (language == Language::JAVA) { + return "java"; + } + RAY_CHECK(false); + return ""; +} + +} // namespace ray diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index f57ae3995..42c1e642a 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -12,6 +12,12 @@ namespace ray { using WorkerType = rpc::WorkerType; +// Return a string representation of the worker type. +std::string WorkerTypeString(WorkerType type); + +// Return a string representation of the language. +std::string LanguageString(Language language); + /// Information about a remote function. class RayFunction { public: diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index d552c59ff..19cb3b81f 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -39,19 +39,10 @@ class WorkerContext { int GetNextPutIndex(); private: - /// Type of the worker. const WorkerType worker_type_; - - /// ID for this worker. const WorkerID worker_id_; - - /// Job ID for this worker. JobID current_job_id_; - - /// ID of current actor. ActorID current_actor_id_; - - /// Whether current actor accepts direct calls. bool current_actor_use_direct_call_; private: diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ec794897c..61cfc95b8 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1,5 +1,7 @@ -#include "ray/core_worker/core_worker.h" +#include + #include "ray/core_worker/context.h" +#include "ray/core_worker/core_worker.h" namespace ray { @@ -7,7 +9,7 @@ CoreWorker::CoreWorker( const WorkerType worker_type, const Language language, const std::string &store_socket, const std::string &raylet_socket, const JobID &job_id, const gcs::GcsClientOptions &gcs_options, - const std::string &log_dir, + const std::string &log_dir, const std::string &node_ip_address, const CoreWorkerTaskExecutionInterface::TaskExecutor &execution_callback, bool use_memory_store) : worker_type_(worker_type), @@ -20,25 +22,38 @@ CoreWorker::CoreWorker( // and cleaned up by the caller. if (!log_dir_.empty()) { std::stringstream app_name; - if (language_ == Language::PYTHON) { - app_name << "python-"; - } else if (language == Language::JAVA) { - app_name << "java-"; - } - if (worker_type_ == WorkerType::DRIVER) { - app_name << "core-driver-" << worker_context_.GetWorkerID(); - } else { - app_name << "core-worker-" << worker_context_.GetWorkerID(); - } + app_name << LanguageString(language_) << "-" << WorkerTypeString(worker_type_) << "-" + << worker_context_.GetWorkerID(); RayLog::StartRayLog(app_name.str(), RayLogLevel::INFO, log_dir_); RayLog::InstallFailureSignalHandler(); } + boost::asio::signal_set sigint(io_service_, SIGINT); + sigint.async_wait( + [](const boost::system::error_code &error, int signal_number) -> void { + if (!error) { + RAY_LOG(WARNING) << "Got SIGINT " << signal_number << ", ignoring it."; + } + }); + + boost::asio::signal_set sigterm(io_service_, SIGTERM); + sigterm.async_wait( + [this](const boost::system::error_code &error, int signal_number) -> void { + if (!error) { + RAY_LOG(WARNING) << "Got SIGTERM " << signal_number << ", shutting down."; + io_service_.stop(); + } + }); + // Initialize gcs client. gcs_client_ = std::unique_ptr(new gcs::RedisGcsClient(gcs_options)); RAY_CHECK_OK(gcs_client_->Connect(io_service_)); + // Initialize profiler. + profiler_ = std::unique_ptr( + new worker::Profiler(worker_context_, node_ip_address, io_service_, gcs_client_)); + object_interface_ = std::unique_ptr(new CoreWorkerObjectInterface( worker_context_, raylet_client_, store_socket, use_memory_store)); @@ -99,7 +114,7 @@ CoreWorker::~CoreWorker() { if (task_execution_interface_) { task_execution_interface_->Stop(); } - if (!log_dir_.empty()) { + if (log_dir_ != "") { RayLog::ShutDownRayLog(); } } @@ -115,4 +130,10 @@ void CoreWorker::Disconnect() { void CoreWorker::StartIOService() { io_service_.run(); } +std::unique_ptr CoreWorker::CreateProfileEvent( + const std::string &event_type) { + return std::unique_ptr( + new worker::ProfileEvent(profiler_, event_type)); +} + } // namespace ray diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 829baac65..0364c6f2f 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -5,6 +5,7 @@ #include "ray/core_worker/common.h" #include "ray/core_worker/context.h" #include "ray/core_worker/object_interface.h" +#include "ray/core_worker/profiling.h" #include "ray/core_worker/task_execution.h" #include "ray/core_worker/task_interface.h" #include "ray/gcs/redis_gcs_client.h" @@ -27,6 +28,7 @@ class CoreWorker { /// \param[in] gcs_options Options for the GCS client. /// \param[in] log_dir Directory to write logs to. If this is empty, logs /// won't be written to a file. + /// \param[in] node_ip_address IP address of the node. /// \param[in] execution_callback Language worker callback to execute tasks. /// \param[in] use_memory_store Whether or not to use the in-memory object store /// in addition to the plasma store. @@ -38,7 +40,7 @@ class CoreWorker { CoreWorker(const WorkerType worker_type, const Language language, const std::string &store_socket, const std::string &raylet_socket, const JobID &job_id, const gcs::GcsClientOptions &gcs_options, - const std::string &log_dir, + const std::string &log_dir, const std::string &node_ip_address, const CoreWorkerTaskExecutionInterface::TaskExecutor &execution_callback, bool use_memory_store = true); @@ -64,6 +66,9 @@ class CoreWorker { /// store. CoreWorkerObjectInterface &Objects() { return *object_interface_; } + /// Create a profile event with a reference to the core worker's profiler. + std::unique_ptr CreateProfileEvent(const std::string &event_type); + /// Return the `CoreWorkerTaskExecutionInterface` that contains methods related to /// task execution. CoreWorkerTaskExecutionInterface &Execution() { @@ -96,6 +101,7 @@ class CoreWorker { boost::asio::io_service::work io_work_; std::thread io_thread_; + std::unique_ptr profiler_; std::unique_ptr raylet_client_; std::unique_ptr gcs_client_; std::unique_ptr task_interface_; diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc index 084a622ac..b0128e5df 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc @@ -71,7 +71,8 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeInitCoreWork try { auto core_worker = new ray::CoreWorker( static_cast(workerMode), ::Language::JAVA, native_store_socket, - native_raylet_socket, job_id, gcs_client_options, /*log_dir=*/"", executor_func); + native_raylet_socket, job_id, gcs_client_options, /*log_dir=*/"", + /*node_ip_address=*/"", executor_func); return reinterpret_cast(core_worker); } catch (const std::exception &e) { std::ostringstream oss; diff --git a/src/ray/core_worker/profiling.cc b/src/ray/core_worker/profiling.cc new file mode 100644 index 000000000..36700b042 --- /dev/null +++ b/src/ray/core_worker/profiling.cc @@ -0,0 +1,53 @@ +#include + +#include "ray/core_worker/profiling.h" + +namespace ray { + +namespace worker { + +ProfileEvent::ProfileEvent(const std::unique_ptr &profiler, + const std::string &event_type) + : profiler_(profiler) { + rpc_event_.set_event_type(event_type); + rpc_event_.set_start_time(current_sys_time_seconds()); +} + +Profiler::Profiler(WorkerContext &worker_context, const std::string &node_ip_address, + boost::asio::io_service &io_service, + std::unique_ptr &gcs_client) + : io_service_(io_service), + timer_(io_service_, boost::asio::chrono::seconds(1)), + gcs_client_(gcs_client) { + rpc_profile_data_.set_component_type(WorkerTypeString(worker_context.GetWorkerType())); + rpc_profile_data_.set_component_id(worker_context.GetWorkerID().Binary()); + rpc_profile_data_.set_node_ip_address(node_ip_address); + timer_.async_wait(boost::bind(&Profiler::FlushEvents, this)); +} + +void Profiler::AddEvent(const rpc::ProfileTableData::ProfileEvent &event) { + io_service_.post([this, event]() -> void { + rpc_profile_data_.add_profile_events()->CopyFrom(event); + }); +} + +void Profiler::FlushEvents() { + if (rpc_profile_data_.profile_events_size() != 0) { + // TODO(edoakes): this should be migrated to use the new GCS client interface + // instead of the raw table interface once it's ready. + if (!gcs_client_->profile_table().AddProfileEventBatch(rpc_profile_data_).ok()) { + RAY_LOG(WARNING) << "Failed to push profile events to GCS."; + } else { + RAY_LOG(DEBUG) << "Pushed " << rpc_profile_data_.profile_events_size() + << "events to GCS."; + } + rpc_profile_data_.clear_profile_events(); + } + // Reset the timer to 1 second from the previous expiration time to avoid drift. + timer_.expires_at(timer_.expiry() + boost::asio::chrono::seconds(1)); + timer_.async_wait(boost::bind(&Profiler::FlushEvents, this)); +} + +} // namespace worker + +} // namespace ray diff --git a/src/ray/core_worker/profiling.h b/src/ray/core_worker/profiling.h new file mode 100644 index 000000000..056571552 --- /dev/null +++ b/src/ray/core_worker/profiling.h @@ -0,0 +1,60 @@ +#ifndef RAY_CORE_WORKER_PROFILING_H +#define RAY_CORE_WORKER_PROFILING_H + +#include "ray/core_worker/context.h" +#include "ray/gcs/redis_gcs_client.h" +#include "ray/util/util.h" + +namespace ray { + +namespace worker { + +class Profiler { + public: + Profiler(WorkerContext &worker_context, const std::string &node_ip_address, + boost::asio::io_service &io_service, + std::unique_ptr &gcs_client); + + // Add an event to the queue to be flushed periodically. + void AddEvent(const rpc::ProfileTableData::ProfileEvent &event); + + private: + // Flush all of the events that have been added since last flush to the GCS. + void FlushEvents(); + + // ASIO IO service event loop. Must be started by the caller. + boost::asio::io_service &io_service_; + + // Timer used to periodically flush events to the GCS. + boost::asio::steady_timer timer_; + + // RPC message containing profiling data. Holds the queue of profile events + // until they are flushed. + rpc::ProfileTableData rpc_profile_data_; + + std::unique_ptr &gcs_client_; +}; + +class ProfileEvent { + public: + ProfileEvent(const std::unique_ptr &profiler, const std::string &event_type); + + ~ProfileEvent() { + rpc_event_.set_end_time(current_sys_time_seconds()); + profiler_->AddEvent(rpc_event_); + } + + void SetExtraData(const std::string &extra_data) { + rpc_event_.set_extra_data(extra_data); + } + + private: + const std::unique_ptr &profiler_; + rpc::ProfileTableData::ProfileEvent rpc_event_; +}; + +} // namespace worker + +} // namespace ray + +#endif // RAY_CORE_WORKER_PROFILING_H diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 8d075f7b8..bb3f2acb4 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -218,7 +218,8 @@ bool CoreWorkerTest::WaitForDirectCallActorState(CoreWorker &worker, void CoreWorkerTest::TestNormalTask(std::unordered_map &resources) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], - raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); + raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1", + nullptr); // Test for tasks with by-value and by-ref args. { @@ -260,7 +261,8 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map &res void CoreWorkerTest::TestActorTask(std::unordered_map &resources, bool is_direct_call) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], - raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); + raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1", + nullptr); auto actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000); @@ -349,7 +351,8 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso void CoreWorkerTest::TestActorReconstruction( std::unordered_map &resources, bool is_direct_call) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], - raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); + raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1", + nullptr); // creating actor. auto actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000); @@ -407,7 +410,8 @@ void CoreWorkerTest::TestActorReconstruction( void CoreWorkerTest::TestActorFailure(std::unordered_map &resources, bool is_direct_call) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], - raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); + raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1", + nullptr); // creating actor. auto actor_handle = @@ -696,7 +700,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], JobID::FromInt(1), gcs_options_, "", - nullptr); + "127.0.0.1", nullptr); std::unique_ptr actor_handle; std::vector object_ids; @@ -813,7 +817,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) { TEST_F(SingleNodeTest, TestObjectInterface) { CoreWorker core_worker(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], - JobID::FromInt(1), gcs_options_, "", nullptr); + JobID::FromInt(1), gcs_options_, "", "127.0.0.1", nullptr); uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; uint8_t array2[] = {10, 11, 12, 13, 14, 15}; @@ -884,10 +888,12 @@ TEST_F(SingleNodeTest, TestObjectInterface) { TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) { CoreWorker worker1(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], - raylet_socket_names_[0], NextJobId(), gcs_options_, "", nullptr); + raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1", + nullptr); CoreWorker worker2(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[1], - raylet_socket_names_[1], NextJobId(), gcs_options_, "", nullptr); + raylet_socket_names_[1], NextJobId(), gcs_options_, "", "127.0.0.1", + nullptr); uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; uint8_t array2[] = {10, 11, 12, 13, 14, 15}; diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index 789586591..d68b7a5a6 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -24,6 +24,7 @@ class MockWorker { const gcs::GcsClientOptions &gcs_options) : worker_(WorkerType::WORKER, Language::PYTHON, store_socket, raylet_socket, JobID::FromInt(1), gcs_options, /*log_dir=*/"", + /*node_id_address=*/"127.0.0.1", std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4)) {} void Run() { diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index ea71c8831..45ba6cdd7 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -187,7 +187,7 @@ Status AuthenticateRedis(redisAsyncContext *context, const std::string &password } void RedisAsyncContextDisconnectCallback(const redisAsyncContext *context, int status) { - RAY_LOG(INFO) << "Redis async context disconnected. Status: " << status; + RAY_LOG(DEBUG) << "Redis async context disconnected. Status: " << status; // Reset raw 'redisAsyncContext' to nullptr because hiredis will release this context. reinterpret_cast(context->data)->ResetRawRedisAsyncContext(); }