Remove non-direct-call code from core worker (#7625)

This commit is contained in:
Edward Oakes
2020-03-22 19:20:08 -05:00
committed by GitHub
parent 81d311031b
commit 8b4f5a9431
13 changed files with 109 additions and 207 deletions
+3 -3
View File
@@ -828,7 +828,7 @@ cdef class CoreWorker:
with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
task_options = CTaskOptions(
num_return_vals, True, c_resources)
num_return_vals, c_resources)
ray_function = CRayFunction(
language.lang, function_descriptor.descriptor)
prepare_args(self, args, &args_vector)
@@ -870,7 +870,7 @@ cdef class CoreWorker:
check_status(self.core_worker.get().CreateActor(
ray_function, args_vector,
CActorCreationOptions(
max_reconstructions, True, max_concurrency,
max_reconstructions, max_concurrency,
c_resources, c_placement_resources,
dynamic_worker_options, is_detached, is_asyncio),
extension_data,
@@ -897,7 +897,7 @@ cdef class CoreWorker:
with self.profile_event(b"submit_task"):
if num_method_cpus > 0:
c_resources[b"CPU"] = num_method_cpus
task_options = CTaskOptions(num_return_vals, False, c_resources)
task_options = CTaskOptions(num_return_vals, c_resources)
ray_function = CRayFunction(
language.lang, function_descriptor.descriptor)
prepare_args(self, args, &args_vector)
+2 -2
View File
@@ -217,13 +217,13 @@ cdef extern from "ray/core_worker/common.h" nogil:
cdef cppclass CTaskOptions "ray::TaskOptions":
CTaskOptions()
CTaskOptions(int num_returns, c_bool is_direct_call,
CTaskOptions(int num_returns,
unordered_map[c_string, double] &resources)
cdef cppclass CActorCreationOptions "ray::ActorCreationOptions":
CActorCreationOptions()
CActorCreationOptions(
uint64_t max_reconstructions, c_bool is_direct_call,
uint64_t max_reconstructions,
int32_t max_concurrency,
const unordered_map[c_string, double] &resources,
const unordered_map[c_string, double] &placement_resources,
+7 -10
View File
@@ -21,7 +21,7 @@ namespace {
ray::rpc::ActorHandle CreateInnerActorHandle(
const class ActorID &actor_id, const TaskID &owner_id,
const ray::rpc::Address &owner_address, const class JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language, bool is_direct_call,
const ObjectID &initial_cursor, const Language actor_language,
const ray::FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data) {
ray::rpc::ActorHandle inner;
@@ -33,7 +33,6 @@ ray::rpc::ActorHandle CreateInnerActorHandle(
*inner.mutable_actor_creation_task_function_descriptor() =
actor_creation_task_function_descriptor->GetMessage();
inner.set_actor_cursor(initial_cursor.Binary());
inner.set_is_direct_call(is_direct_call);
inner.set_extension_data(extension_data);
return inner;
}
@@ -51,25 +50,23 @@ namespace ray {
ActorHandle::ActorHandle(
const class ActorID &actor_id, const TaskID &owner_id,
const rpc::Address &owner_address, const class JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language, bool is_direct_call,
const ObjectID &initial_cursor, const Language actor_language,
const ray::FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data)
: ActorHandle(CreateInnerActorHandle(
actor_id, owner_id, owner_address, job_id, initial_cursor, actor_language,
is_direct_call, actor_creation_task_function_descriptor, extension_data)) {}
actor_creation_task_function_descriptor, extension_data)) {}
ActorHandle::ActorHandle(const std::string &serialized)
: ActorHandle(CreateInnerActorHandleFromString(serialized)) {}
void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder,
const TaskTransportType transport_type,
const ObjectID new_cursor) {
void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_cursor) {
absl::MutexLock guard(&mutex_);
// Build actor task spec.
const TaskID actor_creation_task_id = TaskID::ForActorCreationTask(GetActorID());
const ObjectID actor_creation_dummy_object_id =
ObjectID::ForTaskReturn(actor_creation_task_id, /*index=*/1,
/*transport_type=*/static_cast<int>(transport_type));
const ObjectID actor_creation_dummy_object_id = ObjectID::ForTaskReturn(
actor_creation_task_id, /*index=*/1,
/*transport_type=*/static_cast<int>(TaskTransportType::DIRECT));
builder.SetActorTaskSpec(GetActorID(), actor_creation_dummy_object_id,
/*previous_actor_task_dummy_object_id=*/actor_cursor_,
task_counter_++);
+1 -5
View File
@@ -35,7 +35,6 @@ class ActorHandle {
ActorHandle(const ActorID &actor_id, const TaskID &owner_id,
const rpc::Address &owner_address, const JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language,
bool is_direct_call,
const ray::FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data);
@@ -61,10 +60,7 @@ class ActorHandle {
std::string ExtensionData() const { return inner_.extension_data(); }
bool IsDirectCallActor() const { return inner_.is_direct_call(); }
void SetActorTaskSpec(TaskSpecBuilder &builder, const TaskTransportType transport_type,
const ObjectID new_cursor);
void SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_cursor);
void Serialize(std::string *output);
+4 -12
View File
@@ -99,14 +99,11 @@ class TaskArg {
/// Options for all tasks (actor and non-actor) except for actor creation.
struct TaskOptions {
TaskOptions() {}
TaskOptions(int num_returns, bool is_direct_call,
std::unordered_map<std::string, double> &resources)
: num_returns(num_returns), is_direct_call(is_direct_call), resources(resources) {}
TaskOptions(int num_returns, std::unordered_map<std::string, double> &resources)
: num_returns(num_returns), resources(resources) {}
/// Number of returns of this task.
int num_returns = 1;
/// Whether to use the direct task transport.
bool is_direct_call = false;
/// Resources required by this task.
std::unordered_map<std::string, double> resources;
};
@@ -114,14 +111,12 @@ struct TaskOptions {
/// Options for actor creation tasks.
struct ActorCreationOptions {
ActorCreationOptions() {}
ActorCreationOptions(uint64_t max_reconstructions, bool is_direct_call,
int max_concurrency,
ActorCreationOptions(uint64_t max_reconstructions, int max_concurrency,
const std::unordered_map<std::string, double> &resources,
const std::unordered_map<std::string, double> &placement_resources,
const std::vector<std::string> &dynamic_worker_options,
bool is_detached, bool is_asyncio)
: max_reconstructions(max_reconstructions),
is_direct_call(is_direct_call),
max_concurrency(max_concurrency),
resources(resources),
placement_resources(placement_resources),
@@ -132,9 +127,6 @@ struct ActorCreationOptions {
/// Maximum number of times that the actor should be reconstructed when it dies
/// unexpectedly. It must be non-negative. If it's 0, the actor won't be reconstructed.
const uint64_t max_reconstructions = 0;
/// Whether to use direct actor call. If this is set to true, callers will submit
/// tasks directly to the created actor without going through raylet.
const bool is_direct_call = false;
/// The max number of concurrent tasks to run on this direct call actor.
const int max_concurrency = 1;
/// Resources required by the whole lifetime of this actor.
@@ -147,7 +139,7 @@ struct ActorCreationOptions {
/// Whether to keep the actor persistent after driver exit. If true, this will set
/// the worker to not be destroyed after the driver shutdown.
const bool is_detached = false;
/// Whether to use async mode of direct actor call. is_direct_call must be true.
/// Whether to use async mode of direct actor call.
const bool is_asyncio = false;
};
+1
View File
@@ -147,6 +147,7 @@ bool WorkerContext::ShouldReleaseResourcesOnBlockingCalls() const {
CurrentThreadIsMain();
}
// TODO(edoakes): simplify these checks now that we only support direct call mode.
bool WorkerContext::CurrentActorIsDirectCall() const {
return current_actor_is_direct_call_;
}
+39 -85
View File
@@ -35,13 +35,13 @@ void BuildCommonTaskSpec(
const std::vector<ray::TaskArg> &args, uint64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
ray::TaskTransportType transport_type, std::vector<ObjectID> *return_ids) {
std::vector<ObjectID> *return_ids) {
// Build common task spec.
builder.SetCommonTaskSpec(task_id, function.GetLanguage(),
function.GetFunctionDescriptor(), job_id, current_task_id,
task_index, caller_id, address, num_returns,
transport_type == ray::TaskTransportType::DIRECT,
required_resources, required_placement_resources);
/*is_direct_transport_type=*/true, required_resources,
required_placement_resources);
// Set task arguments.
for (const auto &arg : args) {
if (arg.IsPassedByReference()) {
@@ -54,9 +54,8 @@ void BuildCommonTaskSpec(
// Compute return IDs.
return_ids->resize(num_returns);
for (size_t i = 0; i < num_returns; i++) {
(*return_ids)[i] =
ObjectID::ForTaskReturn(task_id, i + 1,
/*transport_type=*/static_cast<int>(transport_type));
(*return_ids)[i] = ObjectID::ForTaskReturn(
task_id, i + 1, static_cast<int>(ray::TaskTransportType::DIRECT));
}
}
@@ -799,21 +798,14 @@ Status CoreWorker::SubmitTask(const RayFunction &function,
const std::unordered_map<std::string, double> required_resources;
// TODO(ekl) offload task building onto a thread pool for performance
BuildCommonTaskSpec(
builder, worker_context_.GetCurrentJobID(), task_id,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_,
function, args, task_options.num_returns, task_options.resources,
required_resources,
task_options.is_direct_call ? TaskTransportType::DIRECT : TaskTransportType::RAYLET,
return_ids);
BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, task_options.num_returns,
task_options.resources, required_resources, return_ids);
TaskSpecification task_spec = builder.Build();
if (task_options.is_direct_call) {
task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec,
CurrentCallSite(), max_retries);
return direct_task_submitter_->SubmitTask(task_spec);
} else {
return local_raylet_client_->SubmitTask(task_spec);
}
task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, CurrentCallSite(),
max_retries);
return direct_task_submitter_->SubmitTask(task_spec);
}
Status CoreWorker::CreateActor(const RayFunction &function,
@@ -832,33 +824,24 @@ Status CoreWorker::CreateActor(const RayFunction &function,
BuildCommonTaskSpec(builder, job_id, actor_creation_task_id,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, 1, actor_creation_options.resources,
actor_creation_options.placement_resources,
actor_creation_options.is_direct_call ? TaskTransportType::DIRECT
: TaskTransportType::RAYLET,
&return_ids);
actor_creation_options.placement_resources, &return_ids);
builder.SetActorCreationTaskSpec(
actor_id, actor_creation_options.max_reconstructions,
actor_creation_options.dynamic_worker_options,
actor_creation_options.is_direct_call, actor_creation_options.max_concurrency,
/*is_direct_call=*/true, actor_creation_options.max_concurrency,
actor_creation_options.is_detached, actor_creation_options.is_asyncio);
*return_actor_id = actor_id;
TaskSpecification task_spec = builder.Build();
Status status;
if (actor_creation_options.is_direct_call) {
task_manager_->AddPendingTask(
GetCallerId(), rpc_address_, task_spec, CurrentCallSite(),
std::max(RayConfig::instance().actor_creation_min_retries(),
actor_creation_options.max_reconstructions));
status = direct_task_submitter_->SubmitTask(task_spec);
} else {
status = local_raylet_client_->SubmitTask(task_spec);
}
task_manager_->AddPendingTask(
GetCallerId(), rpc_address_, task_spec, CurrentCallSite(),
std::max(RayConfig::instance().actor_creation_min_retries(),
actor_creation_options.max_reconstructions));
Status status = direct_task_submitter_->SubmitTask(task_spec);
std::unique_ptr<ActorHandle> actor_handle(new ActorHandle(
actor_id, GetCallerId(), rpc_address_, job_id, /*actor_cursor=*/return_ids[0],
function.GetLanguage(), actor_creation_options.is_direct_call,
function.GetFunctionDescriptor(), extension_data));
function.GetLanguage(), function.GetFunctionDescriptor(), extension_data));
RAY_CHECK(AddActorHandle(std::move(actor_handle),
/*is_owner_handle=*/!actor_creation_options.is_detached))
<< "Actor " << actor_id << " already exists";
@@ -875,10 +858,6 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f
// Add one for actor cursor object id for tasks.
const int num_returns = task_options.num_returns + 1;
const bool is_direct_call = actor_handle->IsDirectCallActor();
const TaskTransportType transport_type =
is_direct_call ? TaskTransportType::DIRECT : TaskTransportType::RAYLET;
// Build common task spec.
TaskSpecBuilder builder;
const int next_task_index = worker_context_.GetNextTaskIndex();
@@ -889,28 +868,24 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f
BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, num_returns, task_options.resources,
required_resources, transport_type, return_ids);
required_resources, return_ids);
const ObjectID new_cursor = return_ids->back();
actor_handle->SetActorTaskSpec(builder, transport_type, new_cursor);
actor_handle->SetActorTaskSpec(builder, new_cursor);
// Remove cursor from return ids.
return_ids->pop_back();
// Submit task.
Status status;
TaskSpecification task_spec = builder.Build();
if (is_direct_call) {
task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec,
CurrentCallSite());
if (actor_handle->IsDead()) {
auto status = Status::IOError("sent task to dead actor");
task_manager_->PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_DIED,
&status);
} else {
status = direct_actor_submitter_->SubmitTask(task_spec);
}
task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec,
CurrentCallSite());
if (actor_handle->IsDead()) {
auto status = Status::IOError("sent task to dead actor");
task_manager_->PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_DIED,
&status);
} else {
RAY_CHECK_OK(local_raylet_client_->SubmitTask(task_spec));
status = direct_actor_submitter_->SubmitTask(task_spec);
}
return status;
}
@@ -919,7 +894,6 @@ Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill,
bool no_reconstruction) {
ActorHandle *actor_handle = nullptr;
RAY_RETURN_NOT_OK(GetActorHandle(actor_id, &actor_handle));
RAY_CHECK(actor_handle->IsDirectCallActor());
direct_actor_submitter_->KillActor(actor_id, force_kill, no_reconstruction);
return Status::OK();
}
@@ -976,14 +950,9 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
absl::MutexLock lock(&actor_handles_mutex_);
auto it = actor_handles_.find(actor_id);
RAY_CHECK(it != actor_handles_.end());
if (it->second->IsDirectCallActor()) {
// We have to reset the actor handle since the next instance of the
// actor will not have the last sequence number that we sent.
// TODO: Remove the check for direct calls. We do not reset for the
// raylet codepath because it tries to replay all tasks since the
// last actor checkpoint.
it->second->Reset();
}
// We have to reset the actor handle since the next instance of the
// actor will not have the last sequence number that we sent.
it->second->Reset();
direct_actor_submitter_->DisconnectActor(actor_id, false);
} else if (actor_data.state() == gcs::ActorTableData::DEAD) {
direct_actor_submitter_->DisconnectActor(actor_id, true);
@@ -1073,9 +1042,8 @@ Status CoreWorker::AllocateReturnObjects(
}
// Allocate a buffer for the return object.
if (worker_context_.CurrentTaskIsDirectCall() &&
static_cast<int64_t>(data_sizes[i]) <
RayConfig::instance().max_direct_call_object_size()) {
if (static_cast<int64_t>(data_sizes[i]) <
RayConfig::instance().max_direct_call_object_size()) {
data_buffer = std::make_shared<LocalMemoryBuffer>(data_sizes[i]);
} else {
RAY_RETURN_NOT_OK(
@@ -1128,12 +1096,9 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
reference_counter_->AddLocalReference(borrowed_id, task_spec.CallSiteString());
}
const auto transport_type = worker_context_.CurrentTaskIsDirectCall()
? TaskTransportType::DIRECT
: TaskTransportType::RAYLET;
std::vector<ObjectID> return_ids;
for (size_t i = 0; i < task_spec.NumReturns(); i++) {
return_ids.push_back(task_spec.ReturnId(i, transport_type));
return_ids.push_back(task_spec.ReturnId(i, TaskTransportType::DIRECT));
}
Status status;
@@ -1173,11 +1138,6 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to seal object "
<< return_ids[i] << " in store: " << status.message();
}
} else if (!worker_context_.CurrentTaskIsDirectCall()) {
if (!Put(*return_objects->at(i), {}, return_ids[i]).ok()) {
RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to put object "
<< return_ids[i] << " in store: " << status.message();
}
}
}
@@ -1305,16 +1265,10 @@ void CoreWorker::HandleAssignTask(const rpc::AssignTaskRequest &request,
return;
}
if (worker_context_.CurrentActorIsDirectCall()) {
send_reply_callback(Status::Invalid("This actor only accepts direct calls."), nullptr,
nullptr);
return;
} else {
task_queue_length_ += 1;
task_execution_service_.post([=] {
raylet_task_receiver_->HandleAssignTask(request, reply, send_reply_callback);
});
}
task_queue_length_ += 1;
task_execution_service_.post([=] {
raylet_task_receiver_->HandleAssignTask(request, reply, send_reply_callback);
});
}
void CoreWorker::HandlePushTask(const rpc::PushTaskRequest &request,
@@ -85,7 +85,7 @@ inline ray::TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject call
resources = ToResources(env, java_resources);
}
ray::TaskOptions task_options{numReturns, /*use_direct_call=*/true, resources};
ray::TaskOptions task_options{numReturns, resources};
return task_options;
}
@@ -113,7 +113,6 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
ray::ActorCreationOptions actor_creation_options{
static_cast<uint64_t>(max_reconstructions),
/*use_direct_call=*/true,
static_cast<int>(max_concurrency),
resources,
resources,
+40 -64
View File
@@ -59,7 +59,7 @@ static void flushall_redis(void) {
ActorID CreateActorHelper(CoreWorker &worker,
std::unordered_map<std::string, double> &resources,
bool is_direct_call, uint64_t max_reconstructions) {
uint64_t max_reconstructions) {
std::unique_ptr<ActorHandle> actor_handle;
uint8_t array[] = {1, 2, 3};
@@ -71,8 +71,8 @@ ActorID CreateActorHelper(CoreWorker &worker,
args.emplace_back(TaskArg::PassByValue(
std::make_shared<RayObject>(buffer, nullptr, std::vector<ObjectID>())));
ActorCreationOptions actor_options{max_reconstructions, is_direct_call,
/*max_concurrency*/ 1, resources, resources, {},
ActorCreationOptions actor_options{max_reconstructions,
/*max_concurrency*/ 1, resources, resources, {},
/*is_detached*/ false,
/*is_asyncio*/ false};
@@ -258,20 +258,17 @@ class CoreWorkerTest : public ::testing::Test {
void TestNormalTask(std::unordered_map<std::string, double> &resources);
// Test actor tasks.
void TestActorTask(std::unordered_map<std::string, double> &resources,
bool is_direct_call);
void TestActorTask(std::unordered_map<std::string, double> &resources);
// Test actor failure case, verify that the tasks would either succeed or
// fail with exceptions, in that case the return objects fetched from `Get`
// contain errors.
void TestActorFailure(std::unordered_map<std::string, double> &resources,
bool is_direct_call);
void TestActorFailure(std::unordered_map<std::string, double> &resources);
// Test actor failover case. Verify that actor can be reconstructed successfully,
// and as long as we wait for actor reconstruction before submitting new tasks,
// it is guaranteed that all tasks are successfully completed.
void TestActorReconstruction(std::unordered_map<std::string, double> &resources,
bool is_direct_call);
void TestActorReconstruction(std::unordered_map<std::string, double> &resources);
protected:
bool WaitForDirectCallActorState(CoreWorker &worker, const ActorID &actor_id,
@@ -279,8 +276,7 @@ class CoreWorkerTest : public ::testing::Test {
// Get the pid for the worker process that runs the actor.
int GetActorPid(CoreWorker &worker, const ActorID &actor_id,
std::unordered_map<std::string, double> &resources,
bool is_direct_call);
std::unordered_map<std::string, double> &resources);
std::vector<std::string> raylet_socket_names_;
std::vector<std::string> raylet_store_socket_names_;
@@ -301,10 +297,9 @@ bool CoreWorkerTest::WaitForDirectCallActorState(CoreWorker &worker,
}
int CoreWorkerTest::GetActorPid(CoreWorker &worker, const ActorID &actor_id,
std::unordered_map<std::string, double> &resources,
bool is_direct_call) {
std::unordered_map<std::string, double> &resources) {
std::vector<TaskArg> args;
TaskOptions options{1, is_direct_call, resources};
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func{Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"GetWorkerPid", "", "", "")};
@@ -348,8 +343,6 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map<std::string, double> &res
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
TaskOptions options;
options.is_direct_call = true;
std::vector<ObjectID> return_ids;
RAY_CHECK_OK(
driver.SubmitTask(func, args, options, &return_ids, /*max_retries=*/0));
@@ -370,13 +363,12 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map<std::string, double> &res
}
}
void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &resources,
bool is_direct_call) {
void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &resources) {
CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0],
raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1",
node_manager_port, nullptr);
auto actor_id = CreateActorHelper(driver, resources, is_direct_call, 1000);
auto actor_id = CreateActorHelper(driver, resources, 1000);
// Test submitting some tasks with by-value args for that actor.
{
@@ -392,7 +384,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
args.emplace_back(TaskArg::PassByValue(
std::make_shared<RayObject>(buffer2, nullptr, std::vector<ObjectID>())));
TaskOptions options{1, false, resources};
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
@@ -401,7 +393,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
ASSERT_EQ(return_ids.size(), 1);
ASSERT_TRUE(return_ids[0].IsReturnObject());
ASSERT_EQ(static_cast<TaskTransportType>(return_ids[0].GetTransportType()),
is_direct_call ? TaskTransportType::DIRECT : TaskTransportType::RAYLET);
TaskTransportType::DIRECT);
std::vector<std::shared_ptr<ray::RayObject>> results;
RAY_CHECK_OK(driver.Get(return_ids, -1, &results));
@@ -437,7 +429,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
args.emplace_back(TaskArg::PassByValue(
std::make_shared<RayObject>(buffer2, nullptr, std::vector<ObjectID>())));
TaskOptions options{1, false, resources};
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
@@ -459,19 +451,19 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
}
void CoreWorkerTest::TestActorReconstruction(
std::unordered_map<std::string, double> &resources, bool is_direct_call) {
std::unordered_map<std::string, double> &resources) {
CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0],
raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1",
node_manager_port, nullptr);
// creating actor.
auto actor_id = CreateActorHelper(driver, resources, is_direct_call, 1000);
auto actor_id = CreateActorHelper(driver, resources, 1000);
// Wait for actor alive event.
ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_id, true, 30 * 1000 /* 30s */));
RAY_LOG(INFO) << "actor has been created";
auto pid = GetActorPid(driver, actor_id, resources, is_direct_call);
auto pid = GetActorPid(driver, actor_id, resources);
RAY_CHECK(pid != -1);
// Test submitting some tasks with by-value args for that actor.
@@ -485,9 +477,9 @@ void CoreWorkerTest::TestActorReconstruction(
ASSERT_EQ(system("pkill mock_worker"), 0);
// Wait for actor restruction event, and then for alive event.
auto check_actor_restart_func = [this, pid, &driver, &actor_id, &resources,
is_direct_call]() -> bool {
auto new_pid = GetActorPid(driver, actor_id, resources, is_direct_call);
auto check_actor_restart_func = [this, pid, &driver, &actor_id,
&resources]() -> bool {
auto new_pid = GetActorPid(driver, actor_id, resources);
return new_pid != -1 && new_pid != pid;
};
ASSERT_TRUE(WaitForCondition(check_actor_restart_func, 30 * 1000 /* 30s */));
@@ -503,7 +495,7 @@ void CoreWorkerTest::TestActorReconstruction(
args.emplace_back(TaskArg::PassByValue(
std::make_shared<RayObject>(buffer1, nullptr, std::vector<ObjectID>())));
TaskOptions options{1, false, resources};
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
@@ -520,15 +512,14 @@ void CoreWorkerTest::TestActorReconstruction(
}
}
void CoreWorkerTest::TestActorFailure(std::unordered_map<std::string, double> &resources,
bool is_direct_call) {
void CoreWorkerTest::TestActorFailure(
std::unordered_map<std::string, double> &resources) {
CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0],
raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1",
node_manager_port, nullptr);
// creating actor.
auto actor_id =
CreateActorHelper(driver, resources, is_direct_call, 0 /* not reconstructable */);
auto actor_id = CreateActorHelper(driver, resources, 0 /* not reconstructable */);
// Test submitting some tasks with by-value args for that actor.
{
@@ -549,7 +540,7 @@ void CoreWorkerTest::TestActorFailure(std::unordered_map<std::string, double> &r
args.emplace_back(TaskArg::PassByValue(
std::make_shared<RayObject>(buffer1, nullptr, std::vector<ObjectID>())));
TaskOptions options{1, false, resources};
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
@@ -626,7 +617,6 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
std::unordered_map<std::string, double> resources;
ActorCreationOptions actor_options{0,
/*is_direct_call*/ true,
1,
resources,
resources,
@@ -636,8 +626,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
const auto job_id = NextJobId();
ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1),
TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), true, function.GetFunctionDescriptor(),
"");
function.GetLanguage(), function.GetFunctionDescriptor(), "");
// Manually create `num_tasks` task specs, and for each of them create a
// `PushTaskRequest`, this is to batch performance of TaskSpec
@@ -647,7 +636,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
RAY_LOG(INFO) << "start creating " << num_tasks << " PushTaskRequests";
rpc::Address address;
for (int i = 0; i < num_tasks; i++) {
TaskOptions options{1, false, resources};
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
auto num_returns = options.num_returns;
@@ -665,8 +654,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
}
}
actor_handle.SetActorTaskSpec(builder, TaskTransportType::RAYLET,
ObjectID::FromRandom());
actor_handle.SetActorTaskSpec(builder, ObjectID::FromRandom());
auto task_spec = builder.Build();
@@ -686,7 +674,6 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
// Create an actor.
std::unordered_map<std::string, double> resources;
auto actor_id = CreateActorHelper(driver, resources,
/*is_direct_call=*/true,
/*max_reconstructions=*/0);
// wait for actor creation finish.
ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_id, true, 30 * 1000 /* 30s */));
@@ -703,7 +690,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
args.emplace_back(TaskArg::PassByValue(
std::make_shared<RayObject>(buffer, nullptr, std::vector<ObjectID>())));
TaskOptions options{1, false, resources};
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
@@ -753,7 +740,7 @@ TEST_F(ZeroNodeTest, TestActorHandle) {
JobID job_id = NextJobId();
ActorHandle original(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0),
TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
Language::PYTHON, /*is_direct_call=*/false,
Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""), "");
std::string output;
original.Serialize(&output);
@@ -982,46 +969,35 @@ TEST_F(TwoNodeTest, TestNormalTaskCrossNodes) {
TEST_F(SingleNodeTest, TestActorTaskLocal) {
std::unordered_map<std::string, double> resources;
TestActorTask(resources, false);
TestActorTask(resources);
}
TEST_F(TwoNodeTest, TestActorTaskCrossNodes) {
std::unordered_map<std::string, double> resources;
resources.emplace("resource1", 1);
TestActorTask(resources, false);
TestActorTask(resources);
}
TEST_F(SingleNodeTest, TestDirectActorTaskLocal) {
TEST_F(SingleNodeTest, TestActorTaskLocalReconstruction) {
std::unordered_map<std::string, double> resources;
TestActorTask(resources, true);
TestActorReconstruction(resources);
}
TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodes) {
TEST_F(TwoNodeTest, TestActorTaskCrossNodesReconstruction) {
std::unordered_map<std::string, double> resources;
resources.emplace("resource1", 1);
TestActorTask(resources, true);
TestActorReconstruction(resources);
}
TEST_F(SingleNodeTest, TestDirectActorTaskLocalReconstruction) {
TEST_F(SingleNodeTest, TestActorTaskLocalFailure) {
std::unordered_map<std::string, double> resources;
TestActorReconstruction(resources, true);
TestActorFailure(resources);
}
TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesReconstruction) {
TEST_F(TwoNodeTest, TestActorTaskCrossNodesFailure) {
std::unordered_map<std::string, double> resources;
resources.emplace("resource1", 1);
TestActorReconstruction(resources, true);
}
TEST_F(SingleNodeTest, TestDirectActorTaskLocalFailure) {
std::unordered_map<std::string, double> resources;
TestActorFailure(resources, true);
}
TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesFailure) {
std::unordered_map<std::string, double> resources;
resources.emplace("resource1", 1);
TestActorFailure(resources, true);
TestActorFailure(resources);
}
} // namespace ray
@@ -219,12 +219,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
rpc::SendReplyCallback send_reply_callback) {
RAY_CHECK(waiter_ != nullptr) << "Must call init() prior to use";
const TaskSpecification task_spec(request.task_spec());
if (task_spec.IsActorTask() && !worker_context_.CurrentTaskIsDirectCall()) {
send_reply_callback(Status::Invalid("This actor doesn't accept direct calls."),
nullptr, nullptr);
return;
}
std::vector<ObjectID> dependencies;
for (size_t i = 0; i < task_spec.NumArgs(); ++i) {
int count = task_spec.ArgIdCount(i);
+1 -5
View File
@@ -48,11 +48,8 @@ message ActorHandle {
// TODO: Remove this once scheduling is done by task counter only.
bytes actor_cursor = 7;
// Whether direct actor call is used.
bool is_direct_call = 8;
// An extension field that is used for storing app-language-specific data.
bytes extension_data = 9;
bytes extension_data = 8;
}
message AssignTaskRequest {
@@ -101,7 +98,6 @@ message PushTaskRequest {
// will guarantee tasks execute in this sequence, waiting for any
// out-of-order request messages to arrive as necessary.
// If set to -1, ordering is disabled and the task executes immediately.
// This mode of behaviour is used for direct task submission only.
int64 sequence_number = 4;
// The max sequence number the client has processed responses for. This
// is a performance optimization that allows the client to tell the server
+1 -1
View File
@@ -11,7 +11,7 @@ void Transport::SendInternal(std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function, int return_num,
std::vector<ObjectID> &return_ids) {
std::unordered_map<std::string, double> resources;
TaskOptions options{return_num, true, resources};
TaskOptions options{return_num, resources};
char meta_data[3] = {'R', 'A', 'W'};
std::shared_ptr<LocalMemoryBuffer> meta =
+9 -12
View File
@@ -72,9 +72,8 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
}
std::string StartStore() {
std::string store_socket_name = ray::JoinPaths(
ray::GetUserTempDir(),
"store" + RandomObjectID().Hex());
std::string store_socket_name =
ray::JoinPaths(ray::GetUserTempDir(), "store" + RandomObjectID().Hex());
std::string store_pid = store_socket_name + ".pid";
std::string plasma_command = store_executable_ + " -m 10000000 -s " +
store_socket_name +
@@ -96,8 +95,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
std::string StartGcsServer(std::string redis_address) {
std::string gcs_server_socket_name = ray::JoinPaths(
ray::GetUserTempDir(),
"gcs_server" + ObjectID::FromRandom().Hex());
ray::GetUserTempDir(), "gcs_server" + ObjectID::FromRandom().Hex());
std::string ray_start_cmd = gcs_server_executable_;
ray_start_cmd.append(" --redis_address=" + redis_address)
.append(" --redis_port=6379")
@@ -122,9 +120,8 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
std::string StartRaylet(std::string store_socket_name, std::string node_ip_address,
int port, std::string redis_address, std::string resource) {
std::string raylet_socket_name = ray::JoinPaths(
ray::GetUserTempDir(),
"raylet" + RandomObjectID().Hex());
std::string raylet_socket_name =
ray::JoinPaths(ray::GetUserTempDir(), "raylet" + RandomObjectID().Hex());
std::string ray_start_cmd = raylet_executable_;
ray_start_cmd.append(" --raylet_socket_name=" + raylet_socket_name)
.append(" --store_socket_name=" + store_socket_name)
@@ -174,7 +171,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
args.emplace_back(TaskArg::PassByValue(std::make_shared<RayObject>(
msg.ToBytes(), nullptr, std::vector<ObjectID>(), true)));
std::unordered_map<std::string, double> resources;
TaskOptions options{0, true, resources};
TaskOptions options{0, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("init", "", "", "")};
@@ -189,7 +186,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
args.emplace_back(TaskArg::PassByValue(
std::make_shared<RayObject>(buffer, nullptr, std::vector<ObjectID>(), true)));
std::unordered_map<std::string, double> resources;
TaskOptions options{0, true, resources};
TaskOptions options{0, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"execute_test", test, "", "")};
@@ -204,7 +201,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
args.emplace_back(TaskArg::PassByValue(
std::make_shared<RayObject>(buffer, nullptr, std::vector<ObjectID>(), true)));
std::unordered_map<std::string, double> resources;
TaskOptions options{1, true, resources};
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"check_current_test_status", "", "", "")};
@@ -274,7 +271,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
std::make_shared<RayObject>(buffer, nullptr, std::vector<ObjectID>())));
ActorCreationOptions actor_options{
max_reconstructions, is_direct_call,
max_reconstructions,
/*max_concurrency*/ 1, resources, resources, {},
/*is_detached*/ false, /*is_asyncio*/ false};