mirror of
https://github.com/wassname/ray.git
synced 2026-07-02 03:16:55 +08:00
[core worker] add task submission & execution interface (#4922)
This commit is contained in:
+10
-1
@@ -114,6 +114,7 @@ cc_library(
|
||||
],
|
||||
exclude = [
|
||||
"src/ray/core_worker/*_test.cc",
|
||||
"src/ray/core_worker/mock_worker.cc",
|
||||
],
|
||||
),
|
||||
hdrs = glob([
|
||||
@@ -127,7 +128,15 @@ cc_library(
|
||||
],
|
||||
)
|
||||
|
||||
# This test is run by src/ray/test/run_core_worker_tests.sh
|
||||
cc_binary(
|
||||
name = "mock_worker",
|
||||
srcs = ["src/ray/core_worker/mock_worker.cc"],
|
||||
copts = COPTS,
|
||||
deps = [
|
||||
":core_worker_lib",
|
||||
],
|
||||
)
|
||||
|
||||
cc_binary(
|
||||
name = "core_worker_test",
|
||||
srcs = ["src/ray/core_worker/core_worker_test.cc"],
|
||||
|
||||
@@ -12,12 +12,12 @@ namespace ray {
|
||||
enum class WorkerType { WORKER, DRIVER };
|
||||
|
||||
/// Language of Ray tasks and workers.
|
||||
enum class Language { PYTHON, JAVA };
|
||||
enum class WorkerLanguage { PYTHON, JAVA };
|
||||
|
||||
/// Information about a remote function.
|
||||
struct RayFunction {
|
||||
/// Language of the remote function.
|
||||
const Language language;
|
||||
const WorkerLanguage language;
|
||||
/// Function descriptor of the remote function.
|
||||
const std::vector<std::string> function_descriptor;
|
||||
};
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
|
||||
#include "context.h"
|
||||
#include "ray/core_worker/context.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
@@ -23,7 +23,6 @@ struct WorkerThreadContext {
|
||||
void SetCurrentTask(const raylet::TaskSpecification &spec) {
|
||||
SetCurrentTask(spec.TaskId());
|
||||
}
|
||||
|
||||
private:
|
||||
/// The task ID for current task.
|
||||
TaskID current_task_id;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#ifndef RAY_CORE_WORKER_CONTEXT_H
|
||||
#define RAY_CORE_WORKER_CONTEXT_H
|
||||
|
||||
#include "common.h"
|
||||
#include "ray/core_worker/common.h"
|
||||
#include "ray/raylet/task_spec.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
#include "core_worker.h"
|
||||
#include "context.h"
|
||||
#include "ray/core_worker/core_worker.h"
|
||||
#include "ray/core_worker/context.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
CoreWorker::CoreWorker(const enum WorkerType worker_type, const enum Language language,
|
||||
CoreWorker::CoreWorker(const enum WorkerType worker_type,
|
||||
const enum WorkerLanguage language,
|
||||
const std::string &store_socket, const std::string &raylet_socket,
|
||||
DriverID driver_id)
|
||||
: worker_type_(worker_type),
|
||||
@@ -11,20 +12,28 @@ CoreWorker::CoreWorker(const enum WorkerType worker_type, const enum Language la
|
||||
worker_context_(worker_type, driver_id),
|
||||
store_socket_(store_socket),
|
||||
raylet_socket_(raylet_socket),
|
||||
is_initialized_(false),
|
||||
task_interface_(*this),
|
||||
object_interface_(*this),
|
||||
task_execution_interface_(*this) {}
|
||||
task_execution_interface_(*this) {
|
||||
switch (language_) {
|
||||
case ray::WorkerLanguage::JAVA:
|
||||
task_language_ = ::Language::JAVA;
|
||||
break;
|
||||
case ray::WorkerLanguage::PYTHON:
|
||||
task_language_ = ::Language::PYTHON;
|
||||
break;
|
||||
default:
|
||||
RAY_LOG(FATAL) << "Unsupported worker language: " << static_cast<int>(language_);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Status CoreWorker::Connect() {
|
||||
// connect to plasma.
|
||||
RAY_ARROW_RETURN_NOT_OK(store_client_.Connect(store_socket_));
|
||||
|
||||
// connect to raylet.
|
||||
::Language lang = ::Language::PYTHON;
|
||||
if (language_ == ray::Language::JAVA) {
|
||||
lang = ::Language::JAVA;
|
||||
}
|
||||
|
||||
// TODO: currently RayletClient would crash in its constructor if it cannot
|
||||
// connect to Raylet after a number of retries, this needs to be changed
|
||||
// so that the worker (java/python .etc) can retrieve and handle the error
|
||||
@@ -32,7 +41,8 @@ Status CoreWorker::Connect() {
|
||||
raylet_client_ = std::unique_ptr<RayletClient>(
|
||||
new RayletClient(raylet_socket_, worker_context_.GetWorkerID(),
|
||||
(worker_type_ == ray::WorkerType::WORKER),
|
||||
worker_context_.GetCurrentDriverID(), lang));
|
||||
worker_context_.GetCurrentDriverID(), task_language_));
|
||||
is_initialized_ = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
#ifndef RAY_CORE_WORKER_CORE_WORKER_H
|
||||
#define RAY_CORE_WORKER_CORE_WORKER_H
|
||||
|
||||
#include "common.h"
|
||||
#include "context.h"
|
||||
#include "object_interface.h"
|
||||
#include "ray/common/buffer.h"
|
||||
#include "ray/core_worker/common.h"
|
||||
#include "ray/core_worker/context.h"
|
||||
#include "ray/core_worker/object_interface.h"
|
||||
#include "ray/core_worker/task_execution.h"
|
||||
#include "ray/core_worker/task_interface.h"
|
||||
#include "ray/raylet/raylet_client.h"
|
||||
#include "task_execution.h"
|
||||
#include "task_interface.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
@@ -20,7 +20,7 @@ class CoreWorker {
|
||||
///
|
||||
/// \param[in] worker_type Type of this worker.
|
||||
/// \param[in] langauge Language of this worker.
|
||||
CoreWorker(const WorkerType worker_type, const Language language,
|
||||
CoreWorker(const WorkerType worker_type, const WorkerLanguage language,
|
||||
const std::string &store_socket, const std::string &raylet_socket,
|
||||
DriverID driver_id = DriverID::Nil());
|
||||
|
||||
@@ -31,7 +31,7 @@ class CoreWorker {
|
||||
enum WorkerType WorkerType() const { return worker_type_; }
|
||||
|
||||
/// Language of this worker.
|
||||
enum Language Language() const { return language_; }
|
||||
enum WorkerLanguage Language() const { return language_; }
|
||||
|
||||
/// Return the `CoreWorkerTaskInterface` that contains the methods related to task
|
||||
/// submisson.
|
||||
@@ -50,7 +50,10 @@ class CoreWorker {
|
||||
const enum WorkerType worker_type_;
|
||||
|
||||
/// Language of this worker.
|
||||
const enum Language language_;
|
||||
const enum WorkerLanguage language_;
|
||||
|
||||
/// Language of this worker as specified in flatbuf (used by task spec).
|
||||
::Language task_language_;
|
||||
|
||||
/// Worker context per thread.
|
||||
WorkerContext worker_context_;
|
||||
@@ -64,9 +67,15 @@ class CoreWorker {
|
||||
/// Plasma store client.
|
||||
plasma::PlasmaClient store_client_;
|
||||
|
||||
/// Mutex to protect store_client_.
|
||||
std::mutex store_client_mutex_;
|
||||
|
||||
/// Raylet client.
|
||||
std::unique_ptr<RayletClient> raylet_client_;
|
||||
|
||||
/// Whether this worker has been initialized.
|
||||
bool is_initialized_;
|
||||
|
||||
/// The `CoreWorkerTaskInterface` instance.
|
||||
CoreWorkerTaskInterface task_interface_;
|
||||
|
||||
|
||||
@@ -2,9 +2,9 @@
|
||||
#include "gmock/gmock.h"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "context.h"
|
||||
#include "core_worker.h"
|
||||
#include "ray/common/buffer.h"
|
||||
#include "ray/core_worker/context.h"
|
||||
#include "ray/core_worker/core_worker.h"
|
||||
#include "ray/raylet/raylet_client.h"
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
@@ -18,6 +18,7 @@ namespace ray {
|
||||
|
||||
std::string store_executable;
|
||||
std::string raylet_executable;
|
||||
std::string mock_worker_executable;
|
||||
|
||||
ray::ObjectID RandomObjectID() { return ObjectID::FromRandom(); }
|
||||
|
||||
@@ -32,6 +33,9 @@ static void flushall_redis(void) {
|
||||
class CoreWorkerTest : public ::testing::Test {
|
||||
public:
|
||||
CoreWorkerTest(int num_nodes) {
|
||||
// flush redis first.
|
||||
flushall_redis();
|
||||
|
||||
RAY_CHECK(num_nodes >= 0);
|
||||
if (num_nodes > 0) {
|
||||
raylet_socket_names_.resize(num_nodes);
|
||||
@@ -43,10 +47,12 @@ class CoreWorkerTest : public ::testing::Test {
|
||||
store_socket = StartStore();
|
||||
}
|
||||
|
||||
// start raylet on each node
|
||||
// start raylet on each node. Assign each node with different resources so that
|
||||
// a task can be scheduled to the desired node.
|
||||
for (int i = 0; i < num_nodes; i++) {
|
||||
raylet_socket_names_[i] = StartRaylet(raylet_store_socket_names_[i], "127.0.0.1",
|
||||
"127.0.0.1", "\"CPU,4.0\"");
|
||||
raylet_socket_names_[i] =
|
||||
StartRaylet(raylet_store_socket_names_[i], "127.0.0.1", "127.0.0.1",
|
||||
"\"CPU,4.0,resource" + std::to_string(i) + ",10\"");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,7 +72,7 @@ class CoreWorkerTest : public ::testing::Test {
|
||||
std::string plasma_command = store_executable + " -m 10000000 -s " +
|
||||
store_socket_name +
|
||||
" 1> /dev/null 2> /dev/null & echo $! > " + store_pid;
|
||||
RAY_LOG(INFO) << plasma_command;
|
||||
RAY_LOG(DEBUG) << plasma_command;
|
||||
RAY_CHECK(system(plasma_command.c_str()) == 0);
|
||||
usleep(200 * 1000);
|
||||
return store_socket_name;
|
||||
@@ -75,7 +81,7 @@ class CoreWorkerTest : public ::testing::Test {
|
||||
void StopStore(std::string store_socket_name) {
|
||||
std::string store_pid = store_socket_name + ".pid";
|
||||
std::string kill_9 = "kill -9 `cat " + store_pid + "`";
|
||||
RAY_LOG(INFO) << kill_9;
|
||||
RAY_LOG(DEBUG) << kill_9;
|
||||
ASSERT_TRUE(system(kill_9.c_str()) == 0);
|
||||
ASSERT_TRUE(system(("rm -rf " + store_socket_name).c_str()) == 0);
|
||||
ASSERT_TRUE(system(("rm -rf " + store_socket_name + ".pid").c_str()) == 0);
|
||||
@@ -91,13 +97,14 @@ class CoreWorkerTest : public ::testing::Test {
|
||||
.append(" --node_ip_address=" + node_ip_address)
|
||||
.append(" --redis_address=" + redis_address)
|
||||
.append(" --redis_port=6379")
|
||||
.append(" --num_initial_workers=0")
|
||||
.append(" --num_initial_workers=1")
|
||||
.append(" --maximum_startup_concurrency=10")
|
||||
.append(" --static_resource_list=" + resource)
|
||||
.append(" --python_worker_command=NoneCmd")
|
||||
.append(" --python_worker_command=\"" + mock_worker_executable + " " +
|
||||
store_socket_name + " " + raylet_socket_name + "\"")
|
||||
.append(" & echo $! > " + raylet_socket_name + ".pid");
|
||||
|
||||
RAY_LOG(INFO) << "Ray Start command: " << ray_start_cmd;
|
||||
RAY_LOG(DEBUG) << "Ray Start command: " << ray_start_cmd;
|
||||
RAY_CHECK(system(ray_start_cmd.c_str()) == 0);
|
||||
usleep(200 * 1000);
|
||||
return raylet_socket_name;
|
||||
@@ -106,16 +113,134 @@ class CoreWorkerTest : public ::testing::Test {
|
||||
void StopRaylet(std::string raylet_socket_name) {
|
||||
std::string raylet_pid = raylet_socket_name + ".pid";
|
||||
std::string kill_9 = "kill -9 `cat " + raylet_pid + "`";
|
||||
RAY_LOG(INFO) << kill_9;
|
||||
RAY_LOG(DEBUG) << kill_9;
|
||||
ASSERT_TRUE(system(kill_9.c_str()) == 0);
|
||||
ASSERT_TRUE(system(("rm -rf " + raylet_socket_name).c_str()) == 0);
|
||||
ASSERT_TRUE(system(("rm -rf " + raylet_socket_name + ".pid").c_str()) == 0);
|
||||
}
|
||||
|
||||
void SetUp() { flushall_redis(); }
|
||||
void SetUp() {}
|
||||
|
||||
void TearDown() {}
|
||||
|
||||
void TestNormalTask(const std::unordered_map<std::string, double> &resources) {
|
||||
CoreWorker driver(WorkerType::DRIVER, WorkerLanguage::PYTHON,
|
||||
raylet_store_socket_names_[0], raylet_socket_names_[0],
|
||||
DriverID::FromRandom());
|
||||
|
||||
RAY_CHECK_OK(driver.Connect());
|
||||
|
||||
// Test pass by value.
|
||||
{
|
||||
uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8};
|
||||
|
||||
auto buffer1 = std::make_shared<LocalMemoryBuffer>(array1, sizeof(array1));
|
||||
|
||||
RayFunction func{ray::WorkerLanguage::PYTHON, {}};
|
||||
std::vector<TaskArg> args;
|
||||
args.emplace_back(TaskArg::PassByValue(buffer1));
|
||||
|
||||
TaskOptions options;
|
||||
|
||||
std::vector<ObjectID> return_ids;
|
||||
RAY_CHECK_OK(driver.Tasks().SubmitTask(func, args, options, &return_ids));
|
||||
|
||||
ASSERT_EQ(return_ids.size(), 1);
|
||||
|
||||
std::vector<std::shared_ptr<ray::Buffer>> results;
|
||||
RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results));
|
||||
|
||||
ASSERT_EQ(results.size(), 1);
|
||||
ASSERT_EQ(results[0]->Size(), buffer1->Size());
|
||||
ASSERT_EQ(memcmp(results[0]->Data(), buffer1->Data(), buffer1->Size()), 0);
|
||||
}
|
||||
|
||||
// Test pass by reference.
|
||||
{
|
||||
uint8_t array1[] = {10, 11, 12, 13, 14, 15};
|
||||
auto buffer1 = std::make_shared<LocalMemoryBuffer>(array1, sizeof(array1));
|
||||
|
||||
ObjectID object_id;
|
||||
RAY_CHECK_OK(driver.Objects().Put(*buffer1, &object_id));
|
||||
|
||||
std::vector<TaskArg> args;
|
||||
args.emplace_back(TaskArg::PassByReference(object_id));
|
||||
|
||||
RayFunction func{ray::WorkerLanguage::PYTHON, {}};
|
||||
TaskOptions options;
|
||||
|
||||
std::vector<ObjectID> return_ids;
|
||||
RAY_CHECK_OK(driver.Tasks().SubmitTask(func, args, options, &return_ids));
|
||||
|
||||
ASSERT_EQ(return_ids.size(), 1);
|
||||
|
||||
std::vector<std::shared_ptr<ray::Buffer>> results;
|
||||
RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results));
|
||||
|
||||
ASSERT_EQ(results.size(), 1);
|
||||
ASSERT_EQ(results[0]->Size(), buffer1->Size());
|
||||
ASSERT_EQ(memcmp(results[0]->Data(), buffer1->Data(), buffer1->Size()), 0);
|
||||
}
|
||||
}
|
||||
|
||||
void TestActorTask(const std::unordered_map<std::string, double> &resources) {
|
||||
CoreWorker driver(WorkerType::DRIVER, WorkerLanguage::PYTHON,
|
||||
raylet_store_socket_names_[0], raylet_socket_names_[0],
|
||||
DriverID::FromRandom());
|
||||
RAY_CHECK_OK(driver.Connect());
|
||||
|
||||
std::unique_ptr<ActorHandle> actor_handle;
|
||||
|
||||
// Test creating actor.
|
||||
{
|
||||
uint8_t array[] = {1, 2, 3};
|
||||
auto buffer = std::make_shared<LocalMemoryBuffer>(array, sizeof(array));
|
||||
|
||||
RayFunction func{ray::WorkerLanguage::PYTHON, {}};
|
||||
std::vector<TaskArg> args;
|
||||
args.emplace_back(TaskArg::PassByValue(buffer));
|
||||
|
||||
ActorCreationOptions actor_options{0, resources};
|
||||
|
||||
// Create an actor.
|
||||
RAY_CHECK_OK(driver.Tasks().CreateActor(func, args, actor_options, &actor_handle));
|
||||
}
|
||||
|
||||
// Test submitting a task for that actor.
|
||||
{
|
||||
uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8};
|
||||
uint8_t array2[] = {10, 11, 12, 13, 14, 15};
|
||||
|
||||
auto buffer1 = std::make_shared<LocalMemoryBuffer>(array1, sizeof(array1));
|
||||
auto buffer2 = std::make_shared<LocalMemoryBuffer>(array2, sizeof(array2));
|
||||
|
||||
ObjectID object_id;
|
||||
RAY_CHECK_OK(driver.Objects().Put(*buffer1, &object_id));
|
||||
|
||||
// Create arguments with PassByRef and PassByValue.
|
||||
std::vector<TaskArg> args;
|
||||
args.emplace_back(TaskArg::PassByReference(object_id));
|
||||
args.emplace_back(TaskArg::PassByValue(buffer2));
|
||||
|
||||
TaskOptions options{1, resources};
|
||||
std::vector<ObjectID> return_ids;
|
||||
RayFunction func{ray::WorkerLanguage::PYTHON, {}};
|
||||
RAY_CHECK_OK(driver.Tasks().SubmitActorTask(*actor_handle, func, args, options,
|
||||
&return_ids));
|
||||
RAY_CHECK(return_ids.size() == 1);
|
||||
|
||||
std::vector<std::shared_ptr<ray::Buffer>> results;
|
||||
RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results));
|
||||
|
||||
ASSERT_EQ(results.size(), 1);
|
||||
ASSERT_EQ(results[0]->Size(), buffer1->Size() + buffer2->Size());
|
||||
ASSERT_EQ(memcmp(results[0]->Data(), buffer1->Data(), buffer1->Size()), 0);
|
||||
ASSERT_EQ(
|
||||
memcmp(results[0]->Data() + buffer1->Size(), buffer2->Data(), buffer2->Size()),
|
||||
0);
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
std::vector<std::string> raylet_socket_names_;
|
||||
std::vector<std::string> raylet_store_socket_names_;
|
||||
@@ -131,6 +256,11 @@ class SingleNodeTest : public CoreWorkerTest {
|
||||
SingleNodeTest() : CoreWorkerTest(1) {}
|
||||
};
|
||||
|
||||
class TwoNodeTest : public CoreWorkerTest {
|
||||
public:
|
||||
TwoNodeTest() : CoreWorkerTest(2) {}
|
||||
};
|
||||
|
||||
TEST_F(ZeroNodeTest, TestTaskArg) {
|
||||
// Test by-reference argument.
|
||||
ObjectID id = ObjectID::FromRandom();
|
||||
@@ -148,10 +278,10 @@ TEST_F(ZeroNodeTest, TestTaskArg) {
|
||||
}
|
||||
|
||||
TEST_F(ZeroNodeTest, TestAttributeGetters) {
|
||||
CoreWorker core_worker(WorkerType::DRIVER, Language::PYTHON, "", "",
|
||||
CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON, "", "",
|
||||
DriverID::FromRandom());
|
||||
ASSERT_EQ(core_worker.WorkerType(), WorkerType::DRIVER);
|
||||
ASSERT_EQ(core_worker.Language(), Language::PYTHON);
|
||||
ASSERT_EQ(core_worker.Language(), WorkerLanguage::PYTHON);
|
||||
}
|
||||
|
||||
TEST_F(ZeroNodeTest, TestWorkerContext) {
|
||||
@@ -180,7 +310,7 @@ TEST_F(ZeroNodeTest, TestWorkerContext) {
|
||||
}
|
||||
|
||||
TEST_F(SingleNodeTest, TestObjectInterface) {
|
||||
CoreWorker core_worker(WorkerType::DRIVER, Language::PYTHON,
|
||||
CoreWorker core_worker(WorkerType::DRIVER, WorkerLanguage::PYTHON,
|
||||
raylet_store_socket_names_[0], raylet_socket_names_[0],
|
||||
DriverID::FromRandom());
|
||||
RAY_CHECK_OK(core_worker.Connect());
|
||||
@@ -193,16 +323,16 @@ TEST_F(SingleNodeTest, TestObjectInterface) {
|
||||
buffers.emplace_back(array2, sizeof(array2));
|
||||
|
||||
std::vector<ObjectID> ids(buffers.size());
|
||||
for (int i = 0; i < ids.size(); i++) {
|
||||
core_worker.Objects().Put(buffers[i], &ids[i]);
|
||||
for (size_t i = 0; i < ids.size(); i++) {
|
||||
RAY_CHECK_OK(core_worker.Objects().Put(buffers[i], &ids[i]));
|
||||
}
|
||||
|
||||
// Test Get().
|
||||
std::vector<std::shared_ptr<Buffer>> results;
|
||||
core_worker.Objects().Get(ids, 0, &results);
|
||||
RAY_CHECK_OK(core_worker.Objects().Get(ids, -1, &results));
|
||||
|
||||
ASSERT_EQ(results.size(), 2);
|
||||
for (int i = 0; i < ids.size(); i++) {
|
||||
for (size_t i = 0; i < ids.size(); i++) {
|
||||
ASSERT_EQ(results[i]->Size(), buffers[i].Size());
|
||||
ASSERT_EQ(memcmp(results[i]->Data(), buffers[i].Data(), buffers[i].Size()), 0);
|
||||
}
|
||||
@@ -213,34 +343,126 @@ TEST_F(SingleNodeTest, TestObjectInterface) {
|
||||
all_ids.push_back(non_existent_id);
|
||||
|
||||
std::vector<bool> wait_results;
|
||||
core_worker.Objects().Wait(all_ids, 2, -1, &wait_results);
|
||||
RAY_CHECK_OK(core_worker.Objects().Wait(all_ids, 2, -1, &wait_results));
|
||||
ASSERT_EQ(wait_results.size(), 3);
|
||||
ASSERT_EQ(wait_results, std::vector<bool>({true, true, false}));
|
||||
|
||||
core_worker.Objects().Wait(all_ids, 3, 100, &wait_results);
|
||||
RAY_CHECK_OK(core_worker.Objects().Wait(all_ids, 3, 100, &wait_results));
|
||||
ASSERT_EQ(wait_results.size(), 3);
|
||||
ASSERT_EQ(wait_results, std::vector<bool>({true, true, false}));
|
||||
|
||||
// Test Delete().
|
||||
// clear the reference held by PlasmaBuffer.
|
||||
results.clear();
|
||||
core_worker.Objects().Delete(ids, true, false);
|
||||
RAY_CHECK_OK(core_worker.Objects().Delete(ids, true, false));
|
||||
|
||||
// Note that Delete() calls RayletClient::FreeObjects and would not
|
||||
// wait for objects being deleted, so wait a while for plasma store
|
||||
// to process the command.
|
||||
usleep(200 * 1000);
|
||||
core_worker.Objects().Get(ids, 0, &results);
|
||||
RAY_CHECK_OK(core_worker.Objects().Get(ids, 0, &results));
|
||||
ASSERT_EQ(results.size(), 2);
|
||||
ASSERT_TRUE(!results[0]);
|
||||
ASSERT_TRUE(!results[1]);
|
||||
}
|
||||
|
||||
TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) {
|
||||
CoreWorker worker1(WorkerType::DRIVER, WorkerLanguage::PYTHON,
|
||||
raylet_store_socket_names_[0], raylet_socket_names_[0],
|
||||
DriverID::FromRandom());
|
||||
RAY_CHECK_OK(worker1.Connect());
|
||||
|
||||
CoreWorker worker2(WorkerType::DRIVER, WorkerLanguage::PYTHON,
|
||||
raylet_store_socket_names_[1], raylet_socket_names_[1],
|
||||
DriverID::FromRandom());
|
||||
RAY_CHECK_OK(worker2.Connect());
|
||||
|
||||
uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8};
|
||||
uint8_t array2[] = {10, 11, 12, 13, 14, 15};
|
||||
|
||||
std::vector<LocalMemoryBuffer> buffers;
|
||||
buffers.emplace_back(array1, sizeof(array1));
|
||||
buffers.emplace_back(array2, sizeof(array2));
|
||||
|
||||
std::vector<ObjectID> ids(buffers.size());
|
||||
for (size_t i = 0; i < ids.size(); i++) {
|
||||
RAY_CHECK_OK(worker1.Objects().Put(buffers[i], &ids[i]));
|
||||
}
|
||||
|
||||
// Test Get() from remote node.
|
||||
std::vector<std::shared_ptr<Buffer>> results;
|
||||
RAY_CHECK_OK(worker2.Objects().Get(ids, -1, &results));
|
||||
|
||||
ASSERT_EQ(results.size(), 2);
|
||||
for (size_t i = 0; i < ids.size(); i++) {
|
||||
ASSERT_EQ(results[i]->Size(), buffers[i].Size());
|
||||
ASSERT_EQ(memcmp(results[i]->Data(), buffers[i].Data(), buffers[i].Size()), 0);
|
||||
}
|
||||
|
||||
// Test Wait() from remote node.
|
||||
ObjectID non_existent_id = ObjectID::FromRandom();
|
||||
std::vector<ObjectID> all_ids(ids);
|
||||
all_ids.push_back(non_existent_id);
|
||||
|
||||
std::vector<bool> wait_results;
|
||||
RAY_CHECK_OK(worker2.Objects().Wait(all_ids, 2, -1, &wait_results));
|
||||
ASSERT_EQ(wait_results.size(), 3);
|
||||
ASSERT_EQ(wait_results, std::vector<bool>({true, true, false}));
|
||||
|
||||
RAY_CHECK_OK(worker2.Objects().Wait(all_ids, 3, 100, &wait_results));
|
||||
ASSERT_EQ(wait_results.size(), 3);
|
||||
ASSERT_EQ(wait_results, std::vector<bool>({true, true, false}));
|
||||
|
||||
// Test Delete() from all machines.
|
||||
// clear the reference held by PlasmaBuffer.
|
||||
results.clear();
|
||||
RAY_CHECK_OK(worker2.Objects().Delete(ids, false, false));
|
||||
|
||||
// Note that Delete() calls RayletClient::FreeObjects and would not
|
||||
// wait for objects being deleted, so wait a while for plasma store
|
||||
// to process the command.
|
||||
usleep(1000 * 1000);
|
||||
// Verify objects are deleted from both machines.
|
||||
RAY_CHECK_OK(worker2.Objects().Get(ids, 0, &results));
|
||||
ASSERT_EQ(results.size(), 2);
|
||||
ASSERT_TRUE(!results[0]);
|
||||
ASSERT_TRUE(!results[1]);
|
||||
|
||||
RAY_CHECK_OK(worker1.Objects().Get(ids, 0, &results));
|
||||
ASSERT_EQ(results.size(), 2);
|
||||
ASSERT_TRUE(!results[0]);
|
||||
ASSERT_TRUE(!results[1]);
|
||||
}
|
||||
|
||||
TEST_F(SingleNodeTest, TestNormalTaskLocal) {
|
||||
std::unordered_map<std::string, double> resources;
|
||||
TestNormalTask(resources);
|
||||
}
|
||||
|
||||
TEST_F(TwoNodeTest, TestNormalTaskCrossNodes) {
|
||||
std::unordered_map<std::string, double> resources;
|
||||
resources.emplace("resource1", 1);
|
||||
TestNormalTask(resources);
|
||||
}
|
||||
|
||||
TEST_F(SingleNodeTest, TestActorTaskLocal) {
|
||||
std::unordered_map<std::string, double> resources;
|
||||
TestActorTask(resources);
|
||||
}
|
||||
|
||||
TEST_F(TwoNodeTest, TestActorTaskCrossNodes) {
|
||||
std::unordered_map<std::string, double> resources;
|
||||
resources.emplace("resource1", 1);
|
||||
TestActorTask(resources);
|
||||
}
|
||||
|
||||
} // namespace ray
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
RAY_CHECK(argc == 4);
|
||||
ray::store_executable = std::string(argv[1]);
|
||||
ray::raylet_executable = std::string(argv[2]);
|
||||
ray::mock_worker_executable = std::string(argv[3]);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
#include "ray/core_worker/context.h"
|
||||
#include "ray/core_worker/core_worker.h"
|
||||
#include "ray/core_worker/task_execution.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
/// A mock C++ worker used by core_worker_test.cc to verify the task submission/execution
|
||||
/// interfaces in both single node and cross-nodes scenarios. As the raylet client can
|
||||
/// only
|
||||
/// be called by a real worker process, core_worker_test.cc has to use this program binary
|
||||
/// to start the actual worker process, in the test, the task submission interfaces are
|
||||
/// called
|
||||
/// in core_worker_test, and task execution interfaces are called in this file, see that
|
||||
/// test
|
||||
/// for more details on how this class is used.
|
||||
class MockWorker {
|
||||
public:
|
||||
MockWorker(const std::string &store_socket, const std::string &raylet_socket)
|
||||
: worker_(WorkerType::WORKER, WorkerLanguage::PYTHON, store_socket, raylet_socket,
|
||||
DriverID::FromRandom()) {
|
||||
RAY_CHECK_OK(worker_.Connect());
|
||||
}
|
||||
|
||||
void Run() {
|
||||
auto executor_func = [this](const RayFunction &ray_function,
|
||||
const std::vector<std::shared_ptr<Buffer>> &args,
|
||||
const TaskID &task_id, int num_returns) {
|
||||
|
||||
// Note that this doesn't include dummy object id.
|
||||
RAY_CHECK(num_returns >= 0);
|
||||
|
||||
// Merge all the content from input args.
|
||||
std::vector<uint8_t> buffer;
|
||||
for (const auto &arg : args) {
|
||||
buffer.insert(buffer.end(), arg->Data(), arg->Data() + arg->Size());
|
||||
}
|
||||
|
||||
LocalMemoryBuffer memory_buffer(buffer.data(), buffer.size());
|
||||
|
||||
// Write the merged content to each of return ids.
|
||||
for (int i = 0; i < num_returns; i++) {
|
||||
ObjectID id = ObjectID::ForTaskReturn(task_id, i + 1);
|
||||
RAY_CHECK_OK(worker_.Objects().Put(memory_buffer, id));
|
||||
}
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
// Start executing tasks.
|
||||
worker_.Execution().Run(executor_func);
|
||||
}
|
||||
|
||||
private:
|
||||
CoreWorker worker_;
|
||||
};
|
||||
|
||||
} // namespace ray
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
RAY_CHECK(argc == 3);
|
||||
auto store_socket = std::string(argv[1]);
|
||||
auto raylet_socket = std::string(argv[2]);
|
||||
|
||||
ray::MockWorker worker(store_socket, raylet_socket);
|
||||
worker.Run();
|
||||
return 0;
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
#include "object_interface.h"
|
||||
#include "context.h"
|
||||
#include "core_worker.h"
|
||||
#include "ray/core_worker/object_interface.h"
|
||||
#include "ray/common/ray_config.h"
|
||||
#include "ray/core_worker/context.h"
|
||||
#include "ray/core_worker/core_worker.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
@@ -12,14 +12,25 @@ Status CoreWorkerObjectInterface::Put(const Buffer &buffer, ObjectID *object_id)
|
||||
ObjectID put_id = ObjectID::ForPut(core_worker_.worker_context_.GetCurrentTaskID(),
|
||||
core_worker_.worker_context_.GetNextPutIndex());
|
||||
*object_id = put_id;
|
||||
return Put(buffer, put_id);
|
||||
}
|
||||
|
||||
auto plasma_id = put_id.ToPlasmaId();
|
||||
Status CoreWorkerObjectInterface::Put(const Buffer &buffer, const ObjectID &object_id) {
|
||||
auto plasma_id = object_id.ToPlasmaId();
|
||||
std::shared_ptr<arrow::Buffer> data;
|
||||
RAY_ARROW_RETURN_NOT_OK(
|
||||
core_worker_.store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data));
|
||||
{
|
||||
std::unique_lock<std::mutex> guard(core_worker_.store_client_mutex_);
|
||||
RAY_ARROW_RETURN_NOT_OK(
|
||||
core_worker_.store_client_.Create(plasma_id, buffer.Size(), nullptr, 0, &data));
|
||||
}
|
||||
|
||||
memcpy(data->mutable_data(), buffer.Data(), buffer.Size());
|
||||
RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Seal(plasma_id));
|
||||
RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Release(plasma_id));
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> guard(core_worker_.store_client_mutex_);
|
||||
RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Seal(plasma_id));
|
||||
RAY_ARROW_RETURN_NOT_OK(core_worker_.store_client_.Release(plasma_id));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@@ -31,7 +42,7 @@ Status CoreWorkerObjectInterface::Get(const std::vector<ObjectID> &ids,
|
||||
bool was_blocked = false;
|
||||
|
||||
std::unordered_map<ObjectID, int> unready;
|
||||
for (int i = 0; i < ids.size(); i++) {
|
||||
for (size_t i = 0; i < ids.size(); i++) {
|
||||
unready.insert({ids[i], i});
|
||||
}
|
||||
|
||||
@@ -73,10 +84,13 @@ Status CoreWorkerObjectInterface::Get(const std::vector<ObjectID> &ids,
|
||||
}
|
||||
|
||||
std::vector<plasma::ObjectBuffer> object_buffers;
|
||||
auto status =
|
||||
core_worker_.store_client_.Get(plasma_ids, get_timeout, &object_buffers);
|
||||
{
|
||||
std::unique_lock<std::mutex> guard(core_worker_.store_client_mutex_);
|
||||
auto status =
|
||||
core_worker_.store_client_.Get(plasma_ids, get_timeout, &object_buffers);
|
||||
}
|
||||
|
||||
for (int i = 0; i < object_buffers.size(); i++) {
|
||||
for (size_t i = 0; i < object_buffers.size(); i++) {
|
||||
if (object_buffers[i].data != nullptr) {
|
||||
const auto &object_id = unready_ids[i];
|
||||
(*results)[unready[object_id]] =
|
||||
@@ -112,7 +126,7 @@ Status CoreWorkerObjectInterface::Wait(const std::vector<ObjectID> &object_ids,
|
||||
// TODO: change RayletClient::Wait() to return a bit set, so that we don't need
|
||||
// to do this translation.
|
||||
(*results).resize(object_ids.size());
|
||||
for (int i = 0; i < object_ids.size(); i++) {
|
||||
for (size_t i = 0; i < object_ids.size(); i++) {
|
||||
(*results)[i] = ready_ids.count(object_ids[i]) > 0;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
#ifndef RAY_CORE_WORKER_OBJECT_INTERFACE_H
|
||||
#define RAY_CORE_WORKER_OBJECT_INTERFACE_H
|
||||
|
||||
#include "common.h"
|
||||
#include "plasma/client.h"
|
||||
#include "ray/common/buffer.h"
|
||||
#include "ray/common/id.h"
|
||||
#include "ray/common/status.h"
|
||||
#include "ray/core_worker/common.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
@@ -23,6 +23,13 @@ class CoreWorkerObjectInterface {
|
||||
/// \return Status.
|
||||
Status Put(const Buffer &buffer, ObjectID *object_id);
|
||||
|
||||
/// Put an object with specified ID into object store.
|
||||
///
|
||||
/// \param[in] buffer Data buffer of the object.
|
||||
/// \param[in] object_id Object ID specified by user.
|
||||
/// \return Status.
|
||||
Status Put(const Buffer &buffer, const ObjectID &object_id);
|
||||
|
||||
/// Get a list of objects from the object store.
|
||||
///
|
||||
/// \param[in] ids IDs of the objects to get.
|
||||
|
||||
@@ -1,7 +1,80 @@
|
||||
#include "task_execution.h"
|
||||
#include "ray/core_worker/task_execution.h"
|
||||
#include "ray/core_worker/context.h"
|
||||
#include "ray/core_worker/core_worker.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
void CoreWorkerTaskExecutionInterface::Start(const TaskExecutor &executor) {}
|
||||
Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) {
|
||||
RAY_CHECK(core_worker_.is_initialized_);
|
||||
|
||||
while (true) {
|
||||
std::unique_ptr<raylet::TaskSpecification> task_spec;
|
||||
auto status = core_worker_.raylet_client_->GetTask(&task_spec);
|
||||
if (!status.ok()) {
|
||||
RAY_LOG(ERROR) << "Get task failed with error: "
|
||||
<< ray::Status::IOError(status.message());
|
||||
return status;
|
||||
}
|
||||
|
||||
const auto &spec = *task_spec;
|
||||
core_worker_.worker_context_.SetCurrentTask(spec);
|
||||
|
||||
WorkerLanguage language = (spec.GetLanguage() == ::Language::JAVA)
|
||||
? WorkerLanguage::JAVA
|
||||
: WorkerLanguage::PYTHON;
|
||||
RayFunction func{language, spec.FunctionDescriptor()};
|
||||
|
||||
std::vector<std::shared_ptr<Buffer>> args;
|
||||
RAY_CHECK_OK(BuildArgsForExecutor(spec, &args));
|
||||
|
||||
auto num_returns = spec.NumReturns();
|
||||
if (spec.IsActorCreationTask() || spec.IsActorTask()) {
|
||||
RAY_CHECK(num_returns > 0);
|
||||
// Decrease to account for the dummy object id.
|
||||
num_returns--;
|
||||
}
|
||||
|
||||
status = executor(func, args, spec.TaskId(), num_returns);
|
||||
// TODO:
|
||||
// 1. Check and handle failure.
|
||||
// 2. Save or load checkpoint.
|
||||
}
|
||||
|
||||
// should never reach here.
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor(
|
||||
const raylet::TaskSpecification &spec, std::vector<std::shared_ptr<Buffer>> *args) {
|
||||
auto num_args = spec.NumArgs();
|
||||
(*args).resize(num_args);
|
||||
|
||||
std::vector<ObjectID> object_ids_to_fetch;
|
||||
std::vector<int> indices;
|
||||
|
||||
for (int i = 0; i < spec.NumArgs(); ++i) {
|
||||
int count = spec.ArgIdCount(i);
|
||||
if (count > 0) {
|
||||
// pass by reference.
|
||||
RAY_CHECK(count == 1);
|
||||
object_ids_to_fetch.push_back(spec.ArgId(i, 0));
|
||||
indices.push_back(i);
|
||||
} else {
|
||||
// pass by value.
|
||||
(*args)[i] = std::make_shared<LocalMemoryBuffer>(
|
||||
const_cast<uint8_t *>(spec.ArgVal(i)), spec.ArgValLength(i));
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<Buffer>> results;
|
||||
auto status = core_worker_.object_interface_.Get(object_ids_to_fetch, -1, &results);
|
||||
if (status.ok()) {
|
||||
for (size_t i = 0; i < results.size(); i++) {
|
||||
(*args)[indices[i]] = results[i];
|
||||
}
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
} // namespace ray
|
||||
|
||||
@@ -1,14 +1,18 @@
|
||||
#ifndef RAY_CORE_WORKER_TASK_EXECUTION_H
|
||||
#define RAY_CORE_WORKER_TASK_EXECUTION_H
|
||||
|
||||
#include "common.h"
|
||||
#include "ray/common/buffer.h"
|
||||
#include "ray/common/status.h"
|
||||
#include "ray/core_worker/common.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
class CoreWorker;
|
||||
|
||||
namespace raylet {
|
||||
class TaskSpecification;
|
||||
}
|
||||
|
||||
/// The interface that contains all `CoreWorker` methods that are related to task
|
||||
/// execution.
|
||||
class CoreWorkerTaskExecutionInterface {
|
||||
@@ -20,13 +24,26 @@ class CoreWorkerTaskExecutionInterface {
|
||||
/// \param ray_function[in] Information about the function to execute.
|
||||
/// \param args[in] Arguments of the task.
|
||||
/// \return Status.
|
||||
using TaskExecutor = std::function<Status(const RayFunction &ray_function,
|
||||
const std::vector<Buffer> &args)>;
|
||||
using TaskExecutor = std::function<Status(
|
||||
const RayFunction &ray_function, const std::vector<std::shared_ptr<Buffer>> &args,
|
||||
const TaskID &task_id, int num_returns)>;
|
||||
|
||||
/// Start receving and executes tasks in a infinite loop.
|
||||
void Start(const TaskExecutor &executor);
|
||||
/// \return Status.
|
||||
Status Run(const TaskExecutor &executor);
|
||||
|
||||
private:
|
||||
/// Build arguments for task executor. This would loop through all the arguments
|
||||
/// in task spec, and for each of them that's passed by reference (ObjectID),
|
||||
/// fetch its content from store and; for arguments that are passed by value,
|
||||
/// just copy their content.
|
||||
///
|
||||
/// \param spec[in] Task specification.
|
||||
/// \param args[out] The arguments for passing to task executor.
|
||||
///
|
||||
Status BuildArgsForExecutor(const raylet::TaskSpecification &spec,
|
||||
std::vector<std::shared_ptr<Buffer>> *args);
|
||||
|
||||
/// Reference to the parent CoreWorker instance.
|
||||
CoreWorker &core_worker_;
|
||||
};
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
#include "task_interface.h"
|
||||
#include "ray/raylet/task.h"
|
||||
#include "ray/core_worker/context.h"
|
||||
#include "ray/core_worker/core_worker.h"
|
||||
#include "ray/core_worker/task_interface.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
@@ -6,13 +9,61 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function,
|
||||
const std::vector<TaskArg> &args,
|
||||
const TaskOptions &task_options,
|
||||
std::vector<ObjectID> *return_ids) {
|
||||
return Status::OK();
|
||||
auto &context = core_worker_.worker_context_;
|
||||
auto next_task_index = context.GetNextTaskIndex();
|
||||
const auto task_id = GenerateTaskId(context.GetCurrentDriverID(),
|
||||
context.GetCurrentTaskID(), next_task_index);
|
||||
|
||||
auto num_returns = task_options.num_returns;
|
||||
(*return_ids).resize(num_returns);
|
||||
for (int i = 0; i < num_returns; i++) {
|
||||
(*return_ids)[i] = ObjectID::ForTaskReturn(task_id, i + 1);
|
||||
}
|
||||
|
||||
auto task_arguments = BuildTaskArguments(args);
|
||||
auto language = ToTaskLanguage(function.language);
|
||||
|
||||
ray::raylet::TaskSpecification spec(context.GetCurrentDriverID(),
|
||||
context.GetCurrentTaskID(), next_task_index,
|
||||
task_arguments, num_returns, task_options.resources,
|
||||
language, function.function_descriptor);
|
||||
|
||||
std::vector<ObjectID> execution_dependencies;
|
||||
return core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec);
|
||||
}
|
||||
|
||||
Status CoreWorkerTaskInterface::CreateActor(
|
||||
const RayFunction &function, const std::vector<TaskArg> &args,
|
||||
const ActorCreationOptions &actor_creation_options, ActorHandle *actor_handle) {
|
||||
return Status::OK();
|
||||
const ActorCreationOptions &actor_creation_options,
|
||||
std::unique_ptr<ActorHandle> *actor_handle) {
|
||||
auto &context = core_worker_.worker_context_;
|
||||
auto next_task_index = context.GetNextTaskIndex();
|
||||
const auto task_id = GenerateTaskId(context.GetCurrentDriverID(),
|
||||
context.GetCurrentTaskID(), next_task_index);
|
||||
|
||||
std::vector<ObjectID> return_ids;
|
||||
return_ids.push_back(ObjectID::ForTaskReturn(task_id, 1));
|
||||
ActorID actor_creation_id = ActorID::FromBinary(return_ids[0].Binary());
|
||||
|
||||
*actor_handle = std::unique_ptr<ActorHandle>(
|
||||
new ActorHandle(actor_creation_id, ActorHandleID::Nil()));
|
||||
(*actor_handle)->IncreaseTaskCounter();
|
||||
(*actor_handle)->SetActorCursor(return_ids[0]);
|
||||
|
||||
auto task_arguments = BuildTaskArguments(args);
|
||||
auto language = ToTaskLanguage(function.language);
|
||||
|
||||
// Note that the caller is supposed to specify required placement resources
|
||||
// correctly via actor_creation_options.resources.
|
||||
ray::raylet::TaskSpecification spec(
|
||||
context.GetCurrentDriverID(), context.GetCurrentTaskID(), next_task_index,
|
||||
actor_creation_id, ObjectID::Nil(), actor_creation_options.max_reconstructions,
|
||||
ActorID::Nil(), ActorHandleID::Nil(), 0, {}, task_arguments, 1,
|
||||
actor_creation_options.resources, actor_creation_options.resources, language,
|
||||
function.function_descriptor);
|
||||
|
||||
std::vector<ObjectID> execution_dependencies;
|
||||
return core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec);
|
||||
}
|
||||
|
||||
Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
|
||||
@@ -20,7 +71,75 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
|
||||
const std::vector<TaskArg> &args,
|
||||
const TaskOptions &task_options,
|
||||
std::vector<ObjectID> *return_ids) {
|
||||
return Status::OK();
|
||||
auto &context = core_worker_.worker_context_;
|
||||
auto next_task_index = context.GetNextTaskIndex();
|
||||
const auto task_id = GenerateTaskId(context.GetCurrentDriverID(),
|
||||
context.GetCurrentTaskID(), next_task_index);
|
||||
|
||||
// add one for actor cursor object id.
|
||||
auto num_returns = task_options.num_returns + 1;
|
||||
(*return_ids).resize(num_returns);
|
||||
for (int i = 0; i < num_returns; i++) {
|
||||
(*return_ids)[i] = ObjectID::ForTaskReturn(task_id, i + 1);
|
||||
}
|
||||
|
||||
auto actor_creation_dummy_object_id =
|
||||
ObjectID::FromBinary(actor_handle.ActorID().Binary());
|
||||
|
||||
auto task_arguments = BuildTaskArguments(args);
|
||||
auto language = ToTaskLanguage(function.language);
|
||||
|
||||
std::vector<ActorHandleID> new_actor_handles;
|
||||
ray::raylet::TaskSpecification spec(
|
||||
context.GetCurrentDriverID(), context.GetCurrentTaskID(), next_task_index,
|
||||
ActorID::Nil(), actor_creation_dummy_object_id, 0, actor_handle.ActorID(),
|
||||
actor_handle.ActorHandleID(), actor_handle.IncreaseTaskCounter(), new_actor_handles,
|
||||
task_arguments, num_returns, task_options.resources, task_options.resources,
|
||||
language, function.function_descriptor);
|
||||
|
||||
std::vector<ObjectID> execution_dependencies;
|
||||
execution_dependencies.push_back(actor_handle.ActorCursor());
|
||||
|
||||
auto actor_cursor = (*return_ids).back();
|
||||
actor_handle.SetActorCursor(actor_cursor);
|
||||
actor_handle.ClearNewActorHandles();
|
||||
|
||||
auto status = core_worker_.raylet_client_->SubmitTask(execution_dependencies, spec);
|
||||
|
||||
// remove cursor from return ids.
|
||||
(*return_ids).pop_back();
|
||||
return status;
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<raylet::TaskArgument>>
|
||||
CoreWorkerTaskInterface::BuildTaskArguments(const std::vector<TaskArg> &args) {
|
||||
std::vector<std::shared_ptr<raylet::TaskArgument>> task_arguments;
|
||||
for (const auto &arg : args) {
|
||||
if (arg.IsPassedByReference()) {
|
||||
std::vector<ObjectID> references{arg.GetReference()};
|
||||
task_arguments.push_back(
|
||||
std::make_shared<raylet::TaskArgumentByReference>(references));
|
||||
} else {
|
||||
auto data = arg.GetValue();
|
||||
task_arguments.push_back(
|
||||
std::make_shared<raylet::TaskArgumentByValue>(data->Data(), data->Size()));
|
||||
}
|
||||
}
|
||||
return task_arguments;
|
||||
}
|
||||
|
||||
::Language CoreWorkerTaskInterface::ToTaskLanguage(WorkerLanguage language) {
|
||||
switch (language) {
|
||||
case ray::WorkerLanguage::JAVA:
|
||||
return ::Language::JAVA;
|
||||
break;
|
||||
case ray::WorkerLanguage::PYTHON:
|
||||
return ::Language::PYTHON;
|
||||
break;
|
||||
default:
|
||||
RAY_LOG(FATAL) << "invalid language specified: " << static_cast<int>(language);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ray
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
#ifndef RAY_CORE_WORKER_TASK_INTERFACE_H
|
||||
#define RAY_CORE_WORKER_TASK_INTERFACE_H
|
||||
|
||||
#include "common.h"
|
||||
#include <list>
|
||||
|
||||
#include "ray/common/buffer.h"
|
||||
#include "ray/common/id.h"
|
||||
#include "ray/common/status.h"
|
||||
#include "ray/core_worker/common.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
@@ -12,6 +14,10 @@ class CoreWorker;
|
||||
|
||||
/// Options of a non-actor-creation task.
|
||||
struct TaskOptions {
|
||||
TaskOptions() {}
|
||||
TaskOptions(int num_returns, const std::unordered_map<std::string, double> &resources)
|
||||
: num_returns(num_returns), resources(resources) {}
|
||||
|
||||
/// Number of returns of this task.
|
||||
const int num_returns = 1;
|
||||
/// Resources required by this task.
|
||||
@@ -20,6 +26,11 @@ struct TaskOptions {
|
||||
|
||||
/// Options of an actor creation task.
|
||||
struct ActorCreationOptions {
|
||||
ActorCreationOptions() {}
|
||||
ActorCreationOptions(uint64_t max_reconstructions,
|
||||
const std::unordered_map<std::string, double> &resources)
|
||||
: max_reconstructions(max_reconstructions), resources(resources) {}
|
||||
|
||||
/// Maximum number of times that the actor should be reconstructed when it dies
|
||||
/// unexpectedly. It must be non-negative. If it's 0, the actor won't be reconstructed.
|
||||
const uint64_t max_reconstructions = 0;
|
||||
@@ -31,19 +42,46 @@ struct ActorCreationOptions {
|
||||
class ActorHandle {
|
||||
public:
|
||||
ActorHandle(const ActorID &actor_id, const ActorHandleID &actor_handle_id)
|
||||
: actor_id_(actor_id), actor_handle_id_(actor_handle_id) {}
|
||||
: actor_id_(actor_id),
|
||||
actor_handle_id_(actor_handle_id),
|
||||
actor_cursor_(ObjectID::FromBinary(actor_id.Binary())),
|
||||
task_counter_(0) {}
|
||||
|
||||
/// ID of the actor.
|
||||
const class ActorID &ActorID() const { return actor_id_; }
|
||||
const ray::ActorID &ActorID() const { return actor_id_; };
|
||||
|
||||
/// ID of this actor handle.
|
||||
const class ActorHandleID &ActorHandleID() const { return actor_handle_id_; }
|
||||
const ray::ActorHandleID &ActorHandleID() const { return actor_handle_id_; };
|
||||
|
||||
private:
|
||||
/// Cursor of this actor.
|
||||
const ObjectID &ActorCursor() const { return actor_cursor_; };
|
||||
|
||||
/// Set actor cursor.
|
||||
void SetActorCursor(const ObjectID &actor_cursor) { actor_cursor_ = actor_cursor; };
|
||||
|
||||
/// Increase task counter.
|
||||
int IncreaseTaskCounter() { return task_counter_++; }
|
||||
|
||||
std::list<ray::ActorHandleID> GetNewActorHandle() {
|
||||
// TODO: implement this.
|
||||
return std::list<ray::ActorHandleID>();
|
||||
}
|
||||
|
||||
void ClearNewActorHandles() { /* TODO: implement this. */
|
||||
}
|
||||
|
||||
private:
|
||||
/// ID of the actor.
|
||||
const class ActorID actor_id_;
|
||||
const ray::ActorID actor_id_;
|
||||
/// ID of this actor handle.
|
||||
const class ActorHandleID actor_handle_id_;
|
||||
const ray::ActorHandleID actor_handle_id_;
|
||||
/// ID of this actor cursor.
|
||||
ObjectID actor_cursor_;
|
||||
/// Counter for tasks from this handle.
|
||||
int task_counter_;
|
||||
|
||||
friend class CoreWorkerTaskInterface;
|
||||
};
|
||||
|
||||
/// The interface that contains all `CoreWorker` methods that are related to task
|
||||
@@ -71,7 +109,7 @@ class CoreWorkerTaskInterface {
|
||||
/// \return Status.
|
||||
Status CreateActor(const RayFunction &function, const std::vector<TaskArg> &args,
|
||||
const ActorCreationOptions &actor_creation_options,
|
||||
ActorHandle *actor_handle);
|
||||
std::unique_ptr<ActorHandle> *actor_handle);
|
||||
|
||||
/// Submit an actor task.
|
||||
///
|
||||
@@ -89,6 +127,20 @@ class CoreWorkerTaskInterface {
|
||||
private:
|
||||
/// Reference to the parent CoreWorker instance.
|
||||
CoreWorker &core_worker_;
|
||||
|
||||
private:
|
||||
/// Build the arguments for a task spec.
|
||||
///
|
||||
/// \param[in] args Arguments of a task.
|
||||
/// \return Arguments as required by task spec.
|
||||
std::vector<std::shared_ptr<raylet::TaskArgument>> BuildTaskArguments(
|
||||
const std::vector<TaskArg> &args);
|
||||
|
||||
/// Translate from WorkLanguage to Language type (required by taks spec).
|
||||
///
|
||||
/// \param[in] language Language for a task.
|
||||
/// \return Translated task language.
|
||||
::Language ToTaskLanguage(WorkerLanguage language);
|
||||
};
|
||||
|
||||
} // namespace ray
|
||||
|
||||
@@ -2143,7 +2143,7 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
|
||||
// Notify the task dependency manager that this object is local.
|
||||
const auto ready_task_ids = task_dependency_manager_.HandleObjectLocal(object_id);
|
||||
RAY_LOG(DEBUG) << "Object local " << object_id << ", "
|
||||
<< " on " << gcs_client_->client_table().GetLocalClientId()
|
||||
<< " on " << gcs_client_->client_table().GetLocalClientId() << ", "
|
||||
<< ready_task_ids.size() << " tasks ready";
|
||||
// Transition the tasks whose dependencies are now fulfilled to the ready state.
|
||||
if (ready_task_ids.size() > 0) {
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
set -e
|
||||
set -x
|
||||
|
||||
bazel build "//:core_worker_test" "//:raylet" "//:libray_redis_module.so" "@plasma//:plasma_store_server"
|
||||
bazel build "//:core_worker_test" "//:mock_worker" "//:raylet" "//:libray_redis_module.so" "@plasma//:plasma_store_server"
|
||||
|
||||
# Get the directory in which this script is executing.
|
||||
SCRIPT_DIR="`dirname \"$0\"`"
|
||||
@@ -26,6 +26,7 @@ REDIS_MODULE="./bazel-bin/libray_redis_module.so"
|
||||
LOAD_MODULE_ARGS="--loadmodule ${REDIS_MODULE}"
|
||||
STORE_EXEC="./bazel-bin/external/plasma/plasma_store_server"
|
||||
RAYLET_EXEC="./bazel-bin/raylet"
|
||||
MOCK_WORKER_EXEC="./bazel-bin/mock_worker"
|
||||
|
||||
# Allow cleanup commands to fail.
|
||||
bazel run //:redis-cli -- -p 6379 shutdown || true
|
||||
@@ -37,7 +38,7 @@ sleep 2s
|
||||
bazel run //:redis-server -- --loglevel warning ${LOAD_MODULE_ARGS} --port 6380 &
|
||||
sleep 2s
|
||||
# Run tests.
|
||||
./bazel-bin/core_worker_test $STORE_EXEC $RAYLET_EXEC
|
||||
./bazel-bin/core_worker_test $STORE_EXEC $RAYLET_EXEC $MOCK_WORKER_EXEC
|
||||
sleep 1s
|
||||
bazel run //:redis-cli -- -p 6379 shutdown
|
||||
bazel run //:redis-cli -- -p 6380 shutdown
|
||||
|
||||
Reference in New Issue
Block a user