diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index f70e64ea4..a2a7d1560 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -895,7 +895,7 @@ cdef class CoreWorker: c_vector[CTaskArg] args_vector c_vector[CObjectID] return_ids - with profiling.profile("submit_task"): + with self.profile_event("submit_task"): prepare_resources(resources, &c_resources) task_options = CTaskOptions(num_return_vals, c_resources) ray_function = CRayFunction( @@ -954,7 +954,7 @@ cdef class CoreWorker: c_vector[CTaskArg] args_vector c_vector[CObjectID] return_ids - with profiling.profile("submit_task"): + with self.profile_event("submit_task"): prepare_resources(resources, &c_resources) task_options = CTaskOptions(num_return_vals, c_resources) ray_function = CRayFunction( @@ -991,7 +991,7 @@ cdef class CoreWorker: return resources_dict - def profile_event(self, event_type, dict extra_data): + def profile_event(self, event_type, object extra_data=None): cdef: c_string c_event_type = event_type.encode("ascii") diff --git a/python/ray/actor.py b/python/ray/actor.py index f774b99c9..203b83ffc 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -16,7 +16,7 @@ import ray.ray_constants as ray_constants import ray._raylet import ray.signature as signature import ray.worker -from ray import ActorID, ActorClassID, profiling +from ray import ActorID, ActorClassID logger = logging.getLogger(__name__) @@ -468,6 +468,12 @@ class ActorHandle(object): self._ray_class_name = class_name self._ray_actor_method_cpus = actor_method_cpus self._ray_session_and_job = session_and_job + self._ray_function_descriptor_lists = { + method_name: FunctionDescriptor( + self._ray_module_name, method_name, + self._ray_class_name).get_function_descriptor_list() + for method_name in self._ray_method_signatures.keys() + } def _actor_method_call(self, method_name, @@ -500,20 +506,15 @@ class ActorHandle(object): kwargs = kwargs or {} list_args = signature.flatten_args(function_signature, args, kwargs) - function_descriptor = FunctionDescriptor( - self._ray_module_name, method_name, self._ray_class_name) - with profiling.profile("submit_task"): - if worker.mode == ray.LOCAL_MODE: - function = getattr(worker.actors[self._actor_id], method_name) - object_ids = worker.local_mode_manager.execute( - function, function_descriptor, args, kwargs, - num_return_vals) - else: - object_ids = worker.core_worker.submit_actor_task( - self._ray_actor_id, - function_descriptor.get_function_descriptor_list(), - list_args, num_return_vals, - {"CPU": self._ray_actor_method_cpus}) + if worker.mode == ray.LOCAL_MODE: + function = getattr(worker.actors[self._actor_id], method_name) + object_ids = worker.local_mode_manager.execute( + function, method_name, args, kwargs, num_return_vals) + else: + object_ids = worker.core_worker.submit_actor_task( + self._ray_actor_id, + self._ray_function_descriptor_lists[method_name], list_args, + num_return_vals, {"CPU": self._ray_actor_method_cpus}) if len(object_ids) == 1: object_ids = object_ids[0] diff --git a/python/ray/includes/libcoreworker.pxi b/python/ray/includes/libcoreworker.pxi index 3060a2788..8e5a4d92b 100644 --- a/python/ray/includes/libcoreworker.pxi +++ b/python/ray/includes/libcoreworker.pxi @@ -9,10 +9,10 @@ cdef class ProfileEvent: """Cython wrapper class of C++ `ray::worker::ProfileEvent`.""" cdef: unique_ptr[CProfileEvent] inner - dict extra_data + object extra_data @staticmethod - cdef make(unique_ptr[CProfileEvent] event, dict extra_data): + cdef make(unique_ptr[CProfileEvent] event, object extra_data): cdef ProfileEvent self = ProfileEvent.__new__(ProfileEvent) self.inner = move(event) self.extra_data = extra_data @@ -25,7 +25,7 @@ cdef class ProfileEvent: pass def __exit__(self, type, value, tb): - extra_data = {} + extra_data = None if type is not None: extra_data = { "type": str(type), @@ -35,7 +35,7 @@ cdef class ProfileEvent: elif self.extra_data is not None: extra_data = self.extra_data - self.inner.get().SetExtraData(json.dumps(extra_data).encode("ascii")) + self.inner.get().SetExtraData(json.dumps(extra_data).encode("ascii") if extra_data else b"{}") # Deleting the CProfileEvent will add it to a queue to be pushed to # the driver. diff --git a/python/ray/local_mode_manager.py b/python/ray/local_mode_manager.py index d79a0113c..3d9d70a85 100644 --- a/python/ray/local_mode_manager.py +++ b/python/ray/local_mode_manager.py @@ -29,8 +29,7 @@ class LocalModeManager(object): def __init__(self): """Initialize a LocalModeManager.""" - def execute(self, function, function_descriptor, args, kwargs, - num_return_vals): + def execute(self, function, function_name, args, kwargs, num_return_vals): """Synchronously executes a "remote" function or actor method. Stores results directly in the generated and returned @@ -40,7 +39,7 @@ class LocalModeManager(object): Args: function: The function to execute. - function_descriptor: Metadata about the function. + function_name: Name of the function to execute. args: Arguments to the function. These will not be modified by the function execution. kwargs: Keyword arguments to the function. @@ -61,7 +60,6 @@ class LocalModeManager(object): for object_id, result in zip(object_ids, results): object_id.value = result except Exception as e: - function_name = function_descriptor.function_name backtrace = format_error_message(traceback.format_exc()) task_error = RayTaskError(function_name, backtrace, e.__class__) for object_id in object_ids: diff --git a/src/ray/core_worker/profiling.cc b/src/ray/core_worker/profiling.cc index 847d55d01..0ce704d3b 100644 --- a/src/ray/core_worker/profiling.cc +++ b/src/ray/core_worker/profiling.cc @@ -10,7 +10,7 @@ ProfileEvent::ProfileEvent(const std::shared_ptr profiler, const std::string &event_type) : profiler_(profiler) { rpc_event_.set_event_type(event_type); - rpc_event_.set_start_time(current_sys_time_seconds()); + rpc_event_.set_start_time(absl::GetCurrentTimeNanos() / 1e9); } Profiler::Profiler(WorkerContext &worker_context, const std::string &node_ip_address, @@ -19,6 +19,7 @@ Profiler::Profiler(WorkerContext &worker_context, const std::string &node_ip_add : io_service_(io_service), timer_(io_service_, boost::asio::chrono::seconds(1)), gcs_client_(gcs_client) { + absl::MutexLock l(&mu_); rpc_profile_data_.set_component_type(WorkerTypeString(worker_context.GetWorkerType())); rpc_profile_data_.set_component_id(worker_context.GetWorkerID().Binary()); rpc_profile_data_.set_node_ip_address(node_ip_address); @@ -26,12 +27,12 @@ Profiler::Profiler(WorkerContext &worker_context, const std::string &node_ip_add } void Profiler::AddEvent(const rpc::ProfileTableData::ProfileEvent &event) { - io_service_.post([this, event]() -> void { - rpc_profile_data_.add_profile_events()->CopyFrom(event); - }); + absl::MutexLock l(&mu_); + rpc_profile_data_.add_profile_events()->CopyFrom(event); } void Profiler::FlushEvents() { + absl::MutexLock l(&mu_); if (rpc_profile_data_.profile_events_size() != 0) { // TODO(edoakes): this should be migrated to use the new GCS client interface // instead of the raw table interface once it's ready. diff --git a/src/ray/core_worker/profiling.h b/src/ray/core_worker/profiling.h index 8be39ae0f..2d745a6b1 100644 --- a/src/ray/core_worker/profiling.h +++ b/src/ray/core_worker/profiling.h @@ -1,9 +1,11 @@ #ifndef RAY_CORE_WORKER_PROFILING_H #define RAY_CORE_WORKER_PROFILING_H +#include "absl/synchronization/mutex.h" +#include "absl/time/clock.h" + #include "ray/core_worker/context.h" #include "ray/gcs/redis_gcs_client.h" -#include "ray/util/util.h" namespace ray { @@ -30,9 +32,11 @@ class Profiler { // RPC message containing profiling data. Holds the queue of profile events // until they are flushed. - rpc::ProfileTableData rpc_profile_data_; + rpc::ProfileTableData rpc_profile_data_ GUARDED_BY(mu_); std::unique_ptr &gcs_client_; + + absl::Mutex mu_; }; class ProfileEvent { @@ -40,7 +44,7 @@ class ProfileEvent { ProfileEvent(const std::shared_ptr profiler, const std::string &event_type); ~ProfileEvent() { - rpc_event_.set_end_time(current_sys_time_seconds()); + rpc_event_.set_end_time(absl::GetCurrentTimeNanos() / 1e9); profiler_->AddEvent(rpc_event_); } diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index 45ba6cdd7..b805a6322 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -27,7 +27,7 @@ void ProcessCallback(int64_t callback_index, auto callback_item = ray::gcs::RedisCallbackManager::instance().get(callback_index); if (!callback_item.is_subscription) { // Record the redis latency for non-subscription redis operations. - auto end_time = current_sys_time_us(); + auto end_time = absl::GetCurrentTimeNanos() / 1000; ray::stats::RedisLatency().Record(end_time - callback_item.start_time); } // Invoke the callback. @@ -134,7 +134,7 @@ void GlobalRedisCallback(void *c, void *r, void *privdata) { } int64_t RedisCallbackManager::add(const RedisCallback &function, bool is_subscription) { - auto start_time = current_sys_time_us(); + auto start_time = absl::GetCurrentTimeNanos() / 1000; std::lock_guard lock(mutex_); callback_items_.emplace(num_callbacks_, diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index dc003a76a..cc7399d01 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -1,10 +1,11 @@ #include "ray/gcs/tables.h" +#include "absl/time/clock.h" + #include "ray/common/common_protocol.h" #include "ray/common/grpc_util.h" #include "ray/common/ray_config.h" #include "ray/gcs/redis_gcs_client.h" -#include "ray/util/util.h" namespace { @@ -713,7 +714,7 @@ Status ActorCheckpointIdTable::AddCheckpointId(const JobID &job_id, const ActorCheckpointIdData &data) { std::shared_ptr copy = std::make_shared(data); - copy->add_timestamps(current_sys_time_ms()); + copy->add_timestamps(absl::GetCurrentTimeNanos() / 1000000); copy->add_checkpoint_ids(checkpoint_id.Binary()); auto num_to_keep = RayConfig::instance().num_actor_checkpoints_to_keep(); while (copy->timestamps().size() > num_to_keep) { @@ -730,7 +731,7 @@ Status ActorCheckpointIdTable::AddCheckpointId(const JobID &job_id, std::shared_ptr data = std::make_shared(); data->set_actor_id(id.Binary()); - data->add_timestamps(current_sys_time_ms()); + data->add_timestamps(absl::GetCurrentTimeNanos() / 1000000); *data->add_checkpoint_ids() = checkpoint_id.Binary(); RAY_CHECK_OK(Add(job_id, actor_id, data, nullptr)); }; diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 84eaaf250..9f94f1242 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -369,9 +369,9 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { // We haven't pushed this specific object to this specific object manager // yet (or if we have then the object must have been evicted and recreated // locally). - recent_pushes[client_id] = current_sys_time_ms(); + recent_pushes[client_id] = absl::GetCurrentTimeNanos() / 1000000; } else { - int64_t current_time = current_sys_time_ms(); + int64_t current_time = absl::GetCurrentTimeNanos() / 1000000; if (current_time - it->second <= RayConfig::instance().object_manager_repeated_push_delay_ms()) { // We pushed this object to the object manager recently, so don't do it @@ -419,7 +419,7 @@ ray::Status ObjectManager::SendObjectChunk( const UniqueID &push_id, const ObjectID &object_id, const ClientID &client_id, uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, std::shared_ptr rpc_client) { - double start_time = current_sys_time_seconds(); + double start_time = absl::GetCurrentTimeNanos() / 1e9; rpc::PushRequest push_request; // Set request header push_request.set_push_id(push_id.Binary()); @@ -459,7 +459,7 @@ ray::Status ObjectManager::SendObjectChunk( << " failed due to" << status.message() << ", chunk index: " << chunk_index; } - double end_time = current_sys_time_seconds(); + double end_time = absl::GetCurrentTimeNanos() / 1e9; HandleSendFinished(object_id, client_id, chunk_index, start_time, end_time, status); }; rpc_client->Push(push_request, callback); @@ -677,10 +677,10 @@ void ObjectManager::HandlePushRequest(const rpc::PushRequest &request, uint64_t data_size = request.data_size(); const std::string &data = request.data(); - double start_time = current_sys_time_seconds(); + double start_time = absl::GetCurrentTimeNanos() / 1e9; auto status = ReceiveObjectChunk(client_id, object_id, data_size, metadata_size, chunk_index, data); - double end_time = current_sys_time_seconds(); + double end_time = absl::GetCurrentTimeNanos() / 1e9; HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time, status); send_reply_callback(status, nullptr, nullptr); @@ -722,7 +722,7 @@ void ObjectManager::HandlePullRequest(const rpc::PullRequest &request, rpc::ProfileTableData::ProfileEvent profile_event; profile_event.set_event_type("receive_pull_request"); - profile_event.set_start_time(current_sys_time_seconds()); + profile_event.set_start_time(absl::GetCurrentTimeNanos() / 1e9); profile_event.set_end_time(profile_event.start_time()); profile_event.set_extra_data("[\"" + object_id.Hex() + "\",\"" + client_id.Hex() + "\"]"); diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 00d5d9eef..47f88bc6b 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -14,6 +14,7 @@ #include #include +#include "absl/time/clock.h" #include "plasma/client.h" #include "ray/common/id.h" diff --git a/src/ray/raylet/reconstruction_policy.h b/src/ray/raylet/reconstruction_policy.h index a44819dca..dd9d23cb5 100644 --- a/src/ray/raylet/reconstruction_policy.h +++ b/src/ray/raylet/reconstruction_policy.h @@ -9,7 +9,6 @@ #include "ray/common/id.h" #include "ray/gcs/tables.h" -#include "ray/util/util.h" #include "ray/object_manager/object_directory.h" diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index 9a8ee6759..4e4c2801b 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -1,5 +1,6 @@ #include +#include "absl/time/clock.h" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -338,7 +339,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionSuppressed) { // Acquire the task lease for a period longer than the test period. auto task_lease_data = std::make_shared(); task_lease_data->set_node_manager_id(ClientID::FromRandom().Binary()); - task_lease_data->set_acquired_at(current_sys_time_ms()); + task_lease_data->set_acquired_at(absl::GetCurrentTimeNanos() / 1000000); task_lease_data->set_timeout(2 * test_period); mock_gcs_.Add(JobID::Nil(), task_id, task_lease_data); @@ -366,7 +367,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionContinuallySuppressed) { SetPeriodicTimer(reconstruction_timeout_ms_ / 2, [this, task_id]() { auto task_lease_data = std::make_shared(); task_lease_data->set_node_manager_id(ClientID::FromRandom().Binary()); - task_lease_data->set_acquired_at(current_sys_time_ms()); + task_lease_data->set_acquired_at(absl::GetCurrentTimeNanos() / 1000000); task_lease_data->set_timeout(reconstruction_timeout_ms_); mock_gcs_.Add(JobID::Nil(), task_id, task_lease_data); }); diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index ab52fe388..83137ef15 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -1,5 +1,7 @@ #include "task_dependency_manager.h" +#include "absl/time/clock.h" + #include "ray/stats/stats.h" namespace ray { @@ -348,7 +350,7 @@ void TaskDependencyManager::AcquireTaskLease(const TaskID &task_id) { auto task_lease_data = std::make_shared(); task_lease_data->set_node_manager_id(client_id_.Hex()); - task_lease_data->set_acquired_at(current_sys_time_ms()); + task_lease_data->set_acquired_at(absl::GetCurrentTimeNanos() / 1000000); task_lease_data->set_timeout(it->second.lease_period); RAY_CHECK_OK(task_lease_table_.Add(JobID::Nil(), task_id, task_lease_data, nullptr)); diff --git a/src/ray/raylet/task_dependency_manager.h b/src/ray/raylet/task_dependency_manager.h index 7effa44ed..c714ac586 100644 --- a/src/ray/raylet/task_dependency_manager.h +++ b/src/ray/raylet/task_dependency_manager.h @@ -6,7 +6,6 @@ #include "ray/common/task/task.h" #include "ray/object_manager/object_manager.h" #include "ray/raylet/reconstruction_policy.h" -#include "ray/util/util.h" // clang-format on namespace ray { diff --git a/src/ray/util/util.h b/src/ray/util/util.h index 793b548a2..1b10ffa8d 100644 --- a/src/ray/util/util.h +++ b/src/ray/util/util.h @@ -28,25 +28,6 @@ inline int64_t current_time_ms() { return ms_since_epoch.count(); } -inline int64_t current_sys_time_ms() { - std::chrono::milliseconds ms_since_epoch = - std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()); - return ms_since_epoch.count(); -} - -inline int64_t current_sys_time_us() { - std::chrono::microseconds mu_since_epoch = - std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()); - return mu_since_epoch.count(); -} - -inline double current_sys_time_seconds() { - int64_t microseconds_in_seconds = 1000000; - return static_cast(current_sys_time_us()) / microseconds_in_seconds; -} - inline ray::Status boost_to_ray_status(const boost::system::error_code &error) { switch (error.value()) { case boost::system::errc::success: