diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index b248c84e3..3a929a677 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1253,6 +1253,36 @@ def test_direct_call_simple(ray_start_cluster): range(1, 101)) +# https://github.com/ray-project/ray/issues/6329 +def test_call_actors_indirect_through_tasks(ray_start_regular): + @ray.remote + class Counter(object): + def __init__(self, value): + self.value = int(value) + + def increase(self, delta): + self.value += int(delta) + return self.value + + @ray.remote + def foo(object): + return ray.get(object.increase.remote(1)) + + @ray.remote + def bar(object): + return ray.get(object.increase.remote(1)) + + @ray.remote + def zoo(object): + return ray.get(object[0].increase.remote(1)) + + c = Counter.remote(0) + for _ in range(0, 100): + ray.get(foo.remote(c)) + ray.get(bar.remote(c)) + ray.get(zoo.remote([c])) + + def test_direct_call_refcount(ray_start_regular): @ray.remote def f(x): diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 1826f512f..aa475d04b 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -7,15 +7,19 @@ using ray::rpc::ActorTableData; namespace ray { -int64_t GetRequestNumber(const std::unique_ptr &request) { - return request->task_spec().actor_task_spec().actor_counter(); -} - Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) { RAY_LOG(DEBUG) << "Submitting task " << task_spec.TaskId(); RAY_CHECK(task_spec.IsActorTask()); - resolver_.ResolveDependencies(task_spec, [this, task_spec]() mutable { + // We must fix the send order prior to resolving dependencies, which may complete + // out of order. This ensures we preserve the client-side send order. + int64_t send_pos = -1; + { + absl::MutexLock lock(&mu_); + send_pos = next_send_position_to_assign_[task_spec.ActorId()]++; + } + + resolver_.ResolveDependencies(task_spec, [this, send_pos, task_spec]() mutable { const auto &actor_id = task_spec.ActorId(); const auto task_id = task_spec.TaskId(); @@ -25,7 +29,7 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe // access the task. request->mutable_task_spec()->CopyFrom(task_spec.GetMessage()); - std::unique_lock guard(mutex_); + absl::MutexLock lock(&mu_); auto iter = actor_states_.find(actor_id); if (iter == actor_states_.end() || @@ -36,13 +40,11 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe // actor handle (e.g. from unpickling), in that case it might be desirable // to have a timeout to mark it as invalid if it doesn't show up in the // specified time. - auto inserted = pending_requests_[actor_id].emplace(GetRequestNumber(request), - std::move(request)); + auto inserted = pending_requests_[actor_id].emplace(send_pos, std::move(request)); RAY_CHECK(inserted.second); RAY_LOG(DEBUG) << "Actor " << actor_id << " is not yet created."; } else if (iter->second.state_ == ActorTableData::ALIVE) { - auto inserted = pending_requests_[actor_id].emplace(GetRequestNumber(request), - std::move(request)); + auto inserted = pending_requests_[actor_id].emplace(send_pos, std::move(request)); RAY_CHECK(inserted.second); SendPendingTasks(actor_id); } else { @@ -59,7 +61,7 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe void CoreWorkerDirectActorTaskSubmitter::HandleActorUpdate( const ActorID &actor_id, const ActorTableData &actor_data) { - std::unique_lock guard(mutex_); + absl::MutexLock lock(&mu_); actor_states_.erase(actor_id); actor_states_.emplace( actor_id, ActorStateData(actor_data.state(), actor_data.address().ip_address(), @@ -93,7 +95,8 @@ void CoreWorkerDirectActorTaskSubmitter::HandleActorUpdate( pending_requests_.erase(pending_it); } - next_sequence_number_.erase(actor_id); + next_send_position_.erase(actor_id); + next_send_position_to_assign_.erase(actor_id); // No need to clean up tasks that have been sent and are waiting for // replies. They will be treated as failed once the connection dies. @@ -106,7 +109,7 @@ void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_i // Submit all pending requests. auto &requests = pending_requests_[actor_id]; auto head = requests.begin(); - while (head != requests.end() && head->first == next_sequence_number_[actor_id]) { + while (head != requests.end() && head->first == next_send_position_[actor_id]) { auto request = std::move(head->second); head = requests.erase(head); @@ -120,11 +123,7 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask( rpc::CoreWorkerClientInterface &client, std::unique_ptr request, const ActorID &actor_id, const TaskID &task_id, int num_returns) { RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id; - - auto task_number = GetRequestNumber(request); - RAY_CHECK(next_sequence_number_[actor_id] == task_number) - << "Counter was " << task_number << " expected " << next_sequence_number_[actor_id]; - next_sequence_number_[actor_id]++; + next_send_position_[actor_id]++; RAY_CHECK_OK(client.PushActorTask( std::move(request), @@ -138,7 +137,7 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask( } bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const { - std::unique_lock guard(mutex_); + absl::MutexLock lock(&mu_); auto iter = actor_states_.find(actor_id); return (iter != actor_states_.end() && iter->second.state_ == ActorTableData::ALIVE); diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index c93e860a9..33c9dc172 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -79,7 +79,8 @@ class CoreWorkerDirectActorTaskSubmitter { /// \return Void. void PushActorTask(rpc::CoreWorkerClientInterface &client, std::unique_ptr request, - const ActorID &actor_id, const TaskID &task_id, int num_returns); + const ActorID &actor_id, const TaskID &task_id, int num_returns) + EXCLUSIVE_LOCKS_REQUIRED(mu_); /// Send all pending tasks for an actor. /// Note that this function doesn't take lock, the caller is expected to hold @@ -87,7 +88,7 @@ class CoreWorkerDirectActorTaskSubmitter { /// /// \param[in] actor_id Actor ID. /// \return Void. - void SendPendingTasks(const ActorID &actor_id); + void SendPendingTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_); /// Whether the specified actor is alive. /// @@ -99,27 +100,32 @@ class CoreWorkerDirectActorTaskSubmitter { rpc::ClientFactoryFn client_factory_; /// Mutex to proect the various maps below. - mutable std::mutex mutex_; + mutable absl::Mutex mu_; /// Map from actor id to actor state. This only includes actors that we send tasks to. - std::unordered_map actor_states_; + absl::flat_hash_map actor_states_ GUARDED_BY(mu_); /// Map from actor id to rpc client. This only includes actors that we send tasks to. /// We use shared_ptr to enable shared_from_this for pending client callbacks. /// /// TODO(zhijunfu): this will be moved into `actor_states_` later when we can /// subscribe updates for a specific actor. - std::unordered_map> - rpc_clients_; + absl::flat_hash_map> + rpc_clients_ GUARDED_BY(mu_); /// Map from actor id to the actor's pending requests. Each actor's requests /// are ordered by the task number in the request. absl::flat_hash_map>> - pending_requests_; + pending_requests_ GUARDED_BY(mu_); - /// Map from actor id to the sequence number of the next task to send to that - /// actor. - std::unordered_map next_sequence_number_; + /// Map from actor id to the send position of the next task to queue for send + /// for that actor. This is always greater than or equal to next_send_position_. + absl::flat_hash_map next_send_position_to_assign_ GUARDED_BY(mu_); + + /// Map from actor id to the send position of the next task to send to that actor. + /// Note that this differs from the PushTaskRequest's sequence number in that it + /// increases monotonically in this process independently of CallerId changes. + absl::flat_hash_map next_send_position_ GUARDED_BY(mu_); /// Resolve direct call object dependencies; LocalDependencyResolver resolver_; @@ -307,6 +313,7 @@ class SchedulingQueue { << client_processed_up_to; next_seq_no_ = client_processed_up_to + 1; } + RAY_LOG(DEBUG) << "Enqueue " << seq_no << " cur seqno " << next_seq_no_; pending_tasks_[seq_no] = InboundRequest(accept_request, reject_request, dependencies.size() > 0); if (dependencies.size() > 0) {