Submit task asynchronously from raylet client (#5313)

This commit is contained in:
simon-mo
2019-07-30 20:25:55 +00:00
parent a20dab4ffc
commit d767b3487f
2 changed files with 25 additions and 8 deletions
+13 -4
View File
@@ -2,6 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import os
import signal
import sys
@@ -311,8 +312,12 @@ def check_components_alive(cluster, component_type, check_component_alive):
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 8,
"num_nodes": 4
}], indirect=True)
"num_nodes": 4,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 100
}),
}],
indirect=True)
def test_raylet_failed(ray_start_cluster):
cluster = ray_start_cluster
# Kill all raylets on worker nodes.
@@ -329,8 +334,12 @@ def test_raylet_failed(ray_start_cluster):
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 8,
"num_nodes": 2
}], indirect=True)
"num_nodes": 2,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 100
}),
}],
indirect=True)
def test_plasma_store_failed(ray_start_cluster):
cluster = ray_start_cluster
# Kill all plasma stores on worker nodes.
+12 -4
View File
@@ -77,10 +77,18 @@ ray::Status RayletClient::SubmitTask(const ray::TaskSpecification &task_spec) {
SubmitTaskRequest submit_task_request;
submit_task_request.mutable_task_spec()->CopyFrom(task_spec.GetMessage());
grpc::ClientContext context;
SubmitTaskReply reply;
auto status = stub_->SubmitTask(&context, submit_task_request, &reply);
return GrpcStatusToRayStatus(status);
auto callback = [this](const Status &status, const SubmitTaskReply &reply) {
if (!status.ok() && is_connected_) {
is_connected_ = false;
RAY_LOG(INFO) << "Failed to send SubmitTaskRequest, msg: " << status.message();
}
};
auto call =
client_call_manager_.CreateCall<RayletService, SubmitTaskRequest, SubmitTaskReply>(
*stub_, &RayletService::Stub::PrepareAsyncSubmitTask, submit_task_request,
callback);
return call->GetStatus();
}
ray::Status RayletClient::GetTask(std::unique_ptr<ray::TaskSpecification> *task_spec) {