diff --git a/python/ray/tests/test_component_failures.py b/python/ray/tests/test_component_failures.py index f2b55208e..e55793ab4 100644 --- a/python/ray/tests/test_component_failures.py +++ b/python/ray/tests/test_component_failures.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import json import os import signal import sys @@ -311,8 +312,12 @@ def check_components_alive(cluster, component_type, check_component_alive): @pytest.mark.parametrize( "ray_start_cluster", [{ "num_cpus": 8, - "num_nodes": 4 - }], indirect=True) + "num_nodes": 4, + "_internal_config": json.dumps({ + "num_heartbeats_timeout": 100 + }), + }], + indirect=True) def test_raylet_failed(ray_start_cluster): cluster = ray_start_cluster # Kill all raylets on worker nodes. @@ -329,8 +334,12 @@ def test_raylet_failed(ray_start_cluster): @pytest.mark.parametrize( "ray_start_cluster", [{ "num_cpus": 8, - "num_nodes": 2 - }], indirect=True) + "num_nodes": 2, + "_internal_config": json.dumps({ + "num_heartbeats_timeout": 100 + }), + }], + indirect=True) def test_plasma_store_failed(ray_start_cluster): cluster = ray_start_cluster # Kill all plasma stores on worker nodes. diff --git a/src/ray/rpc/raylet/raylet_client.cc b/src/ray/rpc/raylet/raylet_client.cc index c6cfc2f05..83272fae1 100644 --- a/src/ray/rpc/raylet/raylet_client.cc +++ b/src/ray/rpc/raylet/raylet_client.cc @@ -77,10 +77,18 @@ ray::Status RayletClient::SubmitTask(const ray::TaskSpecification &task_spec) { SubmitTaskRequest submit_task_request; submit_task_request.mutable_task_spec()->CopyFrom(task_spec.GetMessage()); - grpc::ClientContext context; - SubmitTaskReply reply; - auto status = stub_->SubmitTask(&context, submit_task_request, &reply); - return GrpcStatusToRayStatus(status); + auto callback = [this](const Status &status, const SubmitTaskReply &reply) { + if (!status.ok() && is_connected_) { + is_connected_ = false; + RAY_LOG(INFO) << "Failed to send SubmitTaskRequest, msg: " << status.message(); + } + }; + + auto call = + client_call_manager_.CreateCall( + *stub_, &RayletService::Stub::PrepareAsyncSubmitTask, submit_task_request, + callback); + return call->GetStatus(); } ray::Status RayletClient::GetTask(std::unique_ptr *task_spec) {