[api] API deprecations and cleanups for 1.0 (internal_config and Checkpointable actor) (#10333)

* remove

* internal config updates, remove Checkpointable

* Lower object timeout default

* remove json

* Fix flaky test

* Fix unit test
This commit is contained in:
Stephanie Wang
2020-08-27 10:19:53 -07:00
committed by GitHub
parent 0aec4cbccb
commit f75dfd60a3
56 changed files with 239 additions and 1267 deletions
+7 -68
View File
@@ -131,11 +131,12 @@ RAY_CONFIG(int64_t, grpc_server_retry_timeout_milliseconds, 1000)
// of creation retries will be MAX(actor_creation_min_retries, max_restarts).
RAY_CONFIG(uint64_t, actor_creation_min_retries, 3)
/// The initial period for a task execution lease. The lease will expire this
/// many milliseconds after the first acquisition of the lease. Nodes that
/// require an object will not try to reconstruct the task until at least
/// this many milliseconds.
RAY_CONFIG(int64_t, initial_reconstruction_timeout_milliseconds, 10000)
/// When trying to resolve an object, the initial period that the raylet will
/// wait before contacting the object's owner to check if the object is still
/// available. This is a lower bound on the time to report the loss of an
/// object stored in the distributed object store in the case that the worker
/// that created the original ObjectRef dies.
RAY_CONFIG(int64_t, object_timeout_milliseconds, 100)
/// The maximum duration that workers can hold on to another worker's lease
/// for direct task submission until it must be returned to the raylet.
@@ -151,15 +152,6 @@ RAY_CONFIG(int64_t, get_timeout_milliseconds, 1000)
RAY_CONFIG(int64_t, worker_get_request_size, 10000)
RAY_CONFIG(int64_t, worker_fetch_request_size, 10000)
/// This is used to bound the size of the Raylet's lineage cache. This is
/// the maximum uncommitted lineage size that any remote task in the cache
/// can have before eviction will be attempted.
RAY_CONFIG(uint64_t, max_lineage_size, 100)
/// This is a temporary constant used by actors to determine how many dummy
/// objects to store.
RAY_CONFIG(int64_t, actor_max_dummy_objects, 1000)
/// Number of times raylet client tries connecting to a raylet.
RAY_CONFIG(int64_t, raylet_client_num_connect_attempts, 10)
RAY_CONFIG(int64_t, raylet_client_connect_timeout_milliseconds, 1000)
@@ -169,62 +161,18 @@ RAY_CONFIG(int64_t, raylet_client_connect_timeout_milliseconds, 1000)
/// the number of missing task dependencies.
RAY_CONFIG(int64_t, raylet_fetch_timeout_milliseconds, 1000)
/// The duration that the raylet will wait between initiating
/// reconstruction calls for missing task dependencies. If there are many
/// missing task dependencies, we will only iniate reconstruction calls for
/// some of them each time.
RAY_CONFIG(int64_t, raylet_reconstruction_timeout_milliseconds, 1000)
/// The maximum number of objects that the raylet will issue
/// reconstruct calls for in a single pass through the reconstruct object
/// timeout handler.
RAY_CONFIG(int64_t, max_num_to_reconstruct, 10000)
/// The maximum number of objects to include in a single fetch request in the
/// regular raylet fetch timeout handler.
RAY_CONFIG(int64_t, raylet_fetch_request_size, 10000)
/// The maximum number of active object IDs to report in a heartbeat.
/// # NOTE: currently disabled by default.
RAY_CONFIG(size_t, raylet_max_active_object_ids, 0)
/// The duration that we wait after sending a worker SIGTERM before sending
/// the worker SIGKILL.
RAY_CONFIG(int64_t, kill_worker_timeout_milliseconds, 100)
/// The duration that we wait after the worekr is launched before the
/// The duration that we wait after the worker is launched before the
/// starting_worker_timeout_callback() is called.
RAY_CONFIG(int64_t, worker_register_timeout_seconds, 30)
/// This is a timeout used to cause failures in the plasma manager and raylet
/// when certain event loop handlers take too long.
RAY_CONFIG(int64_t, max_time_for_handler_milliseconds, 1000)
/// This is used to cause failures when a certain loop in redis.cc which
/// synchronously looks up object manager addresses in redis is slow.
RAY_CONFIG(int64_t, max_time_for_loop, 1000)
/// Allow up to 5 seconds for connecting to Redis.
RAY_CONFIG(int64_t, redis_db_connect_retries, 50)
RAY_CONFIG(int64_t, redis_db_connect_wait_milliseconds, 100)
/// TODO(rkn): These constants are currently unused.
RAY_CONFIG(int64_t, plasma_default_release_delay, 64)
RAY_CONFIG(int64_t, L3_cache_size_bytes, 100000000)
/// Constants for the spillback scheduling policy.
RAY_CONFIG(int64_t, max_tasks_to_spillback, 10)
/// Every time an actor creation task has been spilled back a number of times
/// that is a multiple of this quantity, a warning will be pushed to the
/// corresponding driver. Since spillback currently occurs on a 100ms timer,
/// a value of 100 corresponds to a warning every 10 seconds.
RAY_CONFIG(int64_t, actor_creation_num_spillbacks_warning, 100)
/// If a node manager attempts to forward a task to another node manager and
/// the forward fails, then it will resubmit the task after this duration.
RAY_CONFIG(int64_t, node_manager_forward_task_retry_timeout_milliseconds, 1000)
/// Timeout, in milliseconds, to wait before retrying a failed pull in the
/// ObjectManager.
RAY_CONFIG(int, object_manager_pull_timeout_ms, 10000)
@@ -251,15 +199,6 @@ RAY_CONFIG(int, num_workers_per_process_python, 1)
/// Number of workers per Java worker process
RAY_CONFIG(int, num_workers_per_process_java, 10)
/// Maximum timeout in milliseconds within which a task lease must be renewed.
RAY_CONFIG(int64_t, max_task_lease_timeout_ms, 60000)
/// Maximum number of checkpoints to keep in GCS for an actor.
/// Note: this number should be set to at least 2. Because saving a application
/// checkpoint isn't atomic with saving the backend checkpoint, and it will break
/// if this number is set to 1 and users save application checkpoints in place.
RAY_CONFIG(int32_t, num_actor_checkpoints_to_keep, 20)
/// Maximum number of ids in one batch to send to GCS to delete keys.
RAY_CONFIG(uint32_t, maximum_gcs_deletion_batch_size, 1000)
+2 -2
View File
@@ -124,7 +124,7 @@ std::string TestSetupUtil::StartGcsServer(const std::string &redis_address) {
ray::JoinPaths(ray::GetUserTempDir(), "gcs_server" + ObjectID::FromRandom().Hex());
std::vector<std::string> cmdargs(
{TEST_GCS_SERVER_EXEC_PATH, "--redis_address=" + redis_address, "--redis_port=6379",
"--config_list=initial_reconstruction_timeout_milliseconds,2000"});
"--config_list=object_timeout_milliseconds,2000"});
RAY_LOG(INFO) << "Start gcs server command: " << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true, gcs_server_socket_name + ".pid").second);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
@@ -153,7 +153,7 @@ std::string TestSetupUtil::StartRaylet(const std::string &store_socket_name,
"--python_worker_command=" +
CreateCommandLine({TEST_MOCK_WORKER_EXEC_PATH, store_socket_name,
raylet_socket_name, std::to_string(port)}),
"--config_list=initial_reconstruction_timeout_milliseconds,2000"});
"--config_list=object_timeout_milliseconds,2000"});
RAY_LOG(DEBUG) << "Raylet Start command: " << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true, raylet_socket_name + ".pid").second);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
+3 -3
View File
@@ -302,11 +302,11 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
options_.raylet_ip_address, options_.node_manager_port, *client_call_manager_);
ClientID local_raylet_id;
int assigned_port;
std::unordered_map<std::string, std::string> internal_config;
std::unordered_map<std::string, std::string> system_config;
local_raylet_client_ = std::shared_ptr<raylet::RayletClient>(new raylet::RayletClient(
io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(),
options_.worker_type, worker_context_.GetCurrentJobID(), options_.language,
options_.node_ip_address, &local_raylet_id, &assigned_port, &internal_config,
options_.node_ip_address, &local_raylet_id, &assigned_port, &system_config,
options_.serialized_job_config));
connected_ = true;
@@ -316,7 +316,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
"start'.";
// NOTE(edoakes): any initialization depending on RayConfig must happen after this line.
RayConfig::instance().initialize(internal_config);
RayConfig::instance().initialize(system_config);
// Start RPC server after all the task receivers are properly initialized and we have
// our assigned port from the raylet.
core_worker_server_ = std::unique_ptr<rpc::GrpcServer>(
@@ -86,9 +86,8 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetInternalConfig(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr) {
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto internal_config_string = gcs_accessor->GetInternalConfig();
return static_cast<jbyteArray>(
NativeStringToJavaByteArray(env, internal_config_string));
auto system_config_string = gcs_accessor->GetInternalConfig();
return static_cast<jbyteArray>(NativeStringToJavaByteArray(env, system_config_string));
}
JNIEXPORT jobject JNICALL
+5 -5
View File
@@ -342,16 +342,16 @@ void GcsNodeManager::HandleSetInternalConfig(const rpc::SetInternalConfigRequest
void GcsNodeManager::HandleGetInternalConfig(const rpc::GetInternalConfigRequest &request,
rpc::GetInternalConfigReply *reply,
rpc::SendReplyCallback send_reply_callback) {
auto get_internal_config = [reply, send_reply_callback](
ray::Status status,
const boost::optional<rpc::StoredConfig> &config) {
auto get_system_config = [reply, send_reply_callback](
ray::Status status,
const boost::optional<rpc::StoredConfig> &config) {
if (config.has_value()) {
reply->mutable_config()->CopyFrom(config.get());
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
RAY_CHECK_OK(gcs_table_storage_->InternalConfigTable().Get(UniqueID::Nil(),
get_internal_config));
RAY_CHECK_OK(
gcs_table_storage_->InternalConfigTable().Get(UniqueID::Nil(), get_system_config));
}
std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::GetNode(
+5 -5
View File
@@ -396,8 +396,8 @@ class GcsTableStorage {
}
GcsInternalConfigTable &InternalConfigTable() {
RAY_CHECK(internal_config_table_ != nullptr);
return *internal_config_table_;
RAY_CHECK(system_config_table_ != nullptr);
return *system_config_table_;
}
protected:
@@ -418,7 +418,7 @@ class GcsTableStorage {
std::unique_ptr<GcsHeartbeatBatchTable> heartbeat_batch_table_;
std::unique_ptr<GcsProfileTable> profile_table_;
std::unique_ptr<GcsWorkerTable> worker_table_;
std::unique_ptr<GcsInternalConfigTable> internal_config_table_;
std::unique_ptr<GcsInternalConfigTable> system_config_table_;
};
/// \class RedisGcsTableStorage
@@ -447,7 +447,7 @@ class RedisGcsTableStorage : public GcsTableStorage {
heartbeat_batch_table_.reset(new GcsHeartbeatBatchTable(store_client_));
profile_table_.reset(new GcsProfileTable(store_client_));
worker_table_.reset(new GcsWorkerTable(store_client_));
internal_config_table_.reset(new GcsInternalConfigTable(store_client_));
system_config_table_.reset(new GcsInternalConfigTable(store_client_));
}
};
@@ -475,7 +475,7 @@ class InMemoryGcsTableStorage : public GcsTableStorage {
heartbeat_batch_table_.reset(new GcsHeartbeatBatchTable(store_client_));
profile_table_.reset(new GcsProfileTable(store_client_));
worker_table_.reset(new GcsWorkerTable(store_client_));
internal_config_table_.reset(new GcsInternalConfigTable(store_client_));
system_config_table_.reset(new GcsInternalConfigTable(store_client_));
}
};
+3 -1
View File
@@ -834,7 +834,9 @@ Status ActorCheckpointIdTable::AddCheckpointId(const JobID &job_id,
std::make_shared<ActorCheckpointIdData>(data);
copy->add_timestamps(absl::GetCurrentTimeNanos() / 1000000);
copy->add_checkpoint_ids(checkpoint_id.Binary());
auto num_to_keep = RayConfig::instance().num_actor_checkpoints_to_keep();
// TODO(swang): This is a temporary value while we deprecate the actor
// checkpoint table.
auto num_to_keep = 20;
while (copy->timestamps().size() > num_to_keep) {
// Delete the checkpoint from actor checkpoint table.
const auto &to_delete = ActorCheckpointID::FromBinary(copy->checkpoint_ids(0));
+2 -2
View File
@@ -155,9 +155,9 @@ table RegisterClientReply {
// Port that this worker should listen on.
port: int;
// Keys for internal config options.
internal_config_keys: [string];
system_config_keys: [string];
// Values for internal config options corresponding to keys above.
internal_config_values: [string];
system_config_values: [string];
}
table AnnounceWorkerPort {
+1 -2
View File
@@ -114,7 +114,7 @@ int main(int argc, char *argv[]) {
RAY_CHECK_OK(gcs_client->Connect(main_service));
// The internal_config is only set on the head node--other nodes get it from GCS.
// The system_config is only set on the head node--other nodes get it from GCS.
if (head_node) {
// Parse the configuration list.
std::istringstream config_string(config_list);
@@ -202,7 +202,6 @@ int main(int argc, char *argv[]) {
RayConfig::instance().fair_queueing_enabled();
node_manager_config.object_pinning_enabled =
RayConfig::instance().object_pinning_enabled();
node_manager_config.max_lineage_size = RayConfig::instance().max_lineage_size();
node_manager_config.store_socket_name = store_socket_name;
node_manager_config.temp_dir = temp_dir;
node_manager_config.session_dir = session_dir;
+9 -24
View File
@@ -152,12 +152,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
[this](const TaskID &task_id, const ObjectID &required_object_id) {
HandleTaskReconstruction(task_id, required_object_id);
},
RayConfig::instance().initial_reconstruction_timeout_milliseconds(),
self_node_id_, gcs_client_, object_directory_),
task_dependency_manager_(
object_manager, reconstruction_policy_, io_service, self_node_id_,
RayConfig::instance().initial_reconstruction_timeout_milliseconds(),
gcs_client_),
RayConfig::instance().object_timeout_milliseconds(), self_node_id_, gcs_client_,
object_directory_),
task_dependency_manager_(object_manager, reconstruction_policy_),
actor_registry_(),
node_manager_server_("NodeManager", config.node_manager_port),
node_manager_service_(io_service, *this),
@@ -1262,16 +1259,16 @@ void NodeManager::ProcessRegisterClientRequestMessage(
auto send_reply_callback = [this, client](int assigned_port) {
flatbuffers::FlatBufferBuilder fbb;
std::vector<std::string> internal_config_keys;
std::vector<std::string> internal_config_values;
std::vector<std::string> system_config_keys;
std::vector<std::string> system_config_values;
for (auto kv : initial_config_.raylet_config) {
internal_config_keys.push_back(kv.first);
internal_config_values.push_back(kv.second);
system_config_keys.push_back(kv.first);
system_config_values.push_back(kv.second);
}
auto reply = ray::protocol::CreateRegisterClientReply(
fbb, to_flatbuf(fbb, self_node_id_), assigned_port,
string_vec_to_flatbuf(fbb, internal_config_keys),
string_vec_to_flatbuf(fbb, internal_config_values));
string_vec_to_flatbuf(fbb, system_config_keys),
string_vec_to_flatbuf(fbb, system_config_values));
fbb.Finish(reply);
client->WriteMessageAsync(
static_cast<int64_t>(protocol::MessageType::RegisterClientReply), fbb.GetSize(),
@@ -2114,18 +2111,6 @@ void NodeManager::ScheduleTasks(
RAY_CHECK(local_queues_.GetTasks(TaskState::PLACEABLE).size() == 0);
}
bool NodeManager::CheckDependencyManagerInvariant() const {
std::vector<TaskID> pending_task_ids = task_dependency_manager_.GetPendingTasks();
// Assert that each pending task in the task dependency manager is in one of the queues.
for (const auto &task_id : pending_task_ids) {
if (!local_queues_.HasTask(task_id)) {
return false;
}
}
// TODO(atumanov): perform the check in the opposite direction.
return true;
}
void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_type) {
const TaskSpecification &spec = task.GetTaskSpecification();
RAY_LOG(DEBUG) << "Treating task " << spec.TaskId() << " as failed because of error "
-9
View File
@@ -87,8 +87,6 @@ struct NodeManagerConfig {
bool fair_queueing_enabled;
/// Whether to enable pinning for plasma objects.
bool object_pinning_enabled;
/// the maximum lineage size.
uint64_t max_lineage_size;
/// The store socket name.
std::string store_socket_name;
/// The path to the ray temp dir.
@@ -438,13 +436,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \return Void.
void HandleJobFinished(const JobID &job_id, const JobTableData &job_data);
/// Check if certain invariants associated with the task dependency manager
/// and the local queues are satisfied. This is only used for debugging
/// purposes.
///
/// \return True if the invariants are satisfied and false otherwise.
bool CheckDependencyManagerInvariant() const;
/// Process client message of SubmitTask
///
/// \param message_data A pointer to the message data.
+3 -61
View File
@@ -23,15 +23,8 @@ namespace raylet {
TaskDependencyManager::TaskDependencyManager(
ObjectManagerInterface &object_manager,
ReconstructionPolicyInterface &reconstruction_policy,
boost::asio::io_service &io_service, const ClientID &client_id,
int64_t initial_lease_period_ms, std::shared_ptr<gcs::GcsClient> gcs_client)
: object_manager_(object_manager),
reconstruction_policy_(reconstruction_policy),
io_service_(io_service),
client_id_(client_id),
initial_lease_period_ms_(initial_lease_period_ms),
gcs_client_(gcs_client) {}
ReconstructionPolicyInterface &reconstruction_policy)
: object_manager_(object_manager), reconstruction_policy_(reconstruction_policy) {}
bool TaskDependencyManager::CheckObjectLocal(const ObjectID &object_id) const {
return local_objects_.count(object_id) == 1;
@@ -334,15 +327,6 @@ void TaskDependencyManager::UnsubscribeWaitDependencies(const WorkerID &worker_i
}
}
std::vector<TaskID> TaskDependencyManager::GetPendingTasks() const {
std::vector<TaskID> keys;
keys.reserve(pending_tasks_.size());
for (const auto &id_task_pair : pending_tasks_) {
keys.push_back(id_task_pair.first);
}
return keys;
}
void TaskDependencyManager::TaskPending(const Task &task) {
// Direct tasks are not tracked by the raylet.
// NOTE(zhijunfu): Direct tasks are not tracked by the raylet,
@@ -380,8 +364,7 @@ void TaskDependencyManager::TaskPending(const Task &task) {
RAY_LOG(DEBUG) << "Task execution " << task_id << " pending";
// Record that the task is pending execution.
auto inserted =
pending_tasks_.emplace(task_id, PendingTask(initial_lease_period_ms_, io_service_));
auto inserted = pending_tasks_.insert(task_id);
if (inserted.second) {
// This is the first time we've heard that this task is pending. Find any
// subscribed tasks that are dependent on objects created by the pending
@@ -395,50 +378,9 @@ void TaskDependencyManager::TaskPending(const Task &task) {
HandleRemoteDependencyCanceled(object_entry.first);
}
}
// Acquire the lease for the task's execution in the global lease table.
AcquireTaskLease(task_id);
}
}
void TaskDependencyManager::AcquireTaskLease(const TaskID &task_id) {
auto it = pending_tasks_.find(task_id);
int64_t now_ms = current_time_ms();
if (it == pending_tasks_.end()) {
return;
}
// Check that we were able to renew the task lease before the previous one
// expired.
if (now_ms > it->second.expires_at) {
RAY_LOG(WARNING) << "Task " << task_id << " lease to renew has already expired by "
<< (it->second.expires_at - now_ms) << "ms";
}
auto task_lease_data = std::make_shared<TaskLeaseData>();
task_lease_data->set_task_id(task_id.Binary());
task_lease_data->set_node_manager_id(client_id_.Binary());
task_lease_data->set_acquired_at(absl::GetCurrentTimeNanos() / 1000000);
task_lease_data->set_timeout(it->second.lease_period);
RAY_CHECK_OK(gcs_client_->Tasks().AsyncAddTaskLease(task_lease_data, nullptr));
auto period = boost::posix_time::milliseconds(it->second.lease_period / 2);
it->second.lease_timer->expires_from_now(period);
it->second.lease_timer->async_wait(
[this, task_id](const boost::system::error_code &error) {
if (!error) {
AcquireTaskLease(task_id);
} else {
// Check that the error was due to the timer being canceled.
RAY_CHECK(error == boost::asio::error::operation_aborted);
}
});
it->second.expires_at = now_ms + it->second.lease_period;
it->second.lease_period = std::min(it->second.lease_period * 2,
RayConfig::instance().max_task_lease_timeout_ms());
}
void TaskDependencyManager::TaskCanceled(const TaskID &task_id) {
RAY_LOG(DEBUG) << "Task execution " << task_id << " canceled";
// Record that the task is no longer pending execution.
+2 -43
View File
@@ -44,10 +44,7 @@ class TaskDependencyManager {
public:
/// Create a task dependency manager.
TaskDependencyManager(ObjectManagerInterface &object_manager,
ReconstructionPolicyInterface &reconstruction_policy,
boost::asio::io_service &io_service, const ClientID &client_id,
int64_t initial_lease_period_ms,
std::shared_ptr<gcs::GcsClient> gcs_client);
ReconstructionPolicyInterface &reconstruction_policy);
/// Check whether an object is locally available.
///
@@ -142,12 +139,6 @@ class TaskDependencyManager {
/// this object dependency.
std::vector<TaskID> HandleObjectMissing(const ray::ObjectID &object_id);
/// Get a list of all Tasks currently marked as pending object dependencies in the task
/// dependency manager.
///
/// \return Return a vector of TaskIDs for tasks registered as pending.
std::vector<TaskID> GetPendingTasks() const;
/// Remove all of the tasks specified. These tasks will no longer be
/// considered pending and the objects they depend on will no longer be
/// required.
@@ -208,21 +199,6 @@ class TaskDependencyManager {
/// will be automatically removed from this set once it becomes local.
using WorkerDependencies = std::unordered_set<ObjectID>;
struct PendingTask {
PendingTask(int64_t initial_lease_period_ms, boost::asio::io_service &io_service)
: lease_period(initial_lease_period_ms),
expires_at(INT64_MAX),
lease_timer(new boost::asio::deadline_timer(io_service)) {}
/// The timeout within which the lease should be renewed.
int64_t lease_period;
/// The time at which the current lease will expire, according to this
/// node's steady clock.
int64_t expires_at;
/// A timer used to determine when to next renew the lease.
std::unique_ptr<boost::asio::deadline_timer> lease_timer;
};
/// Check whether the given object needs to be made available through object
/// transfer or reconstruction. These are objects for which: (1) there is a
/// subscribed task dependent on it, (2) the object is not local, and (3) the
@@ -235,29 +211,12 @@ class TaskDependencyManager {
/// operations to make the object available through object transfer or
/// reconstruction.
void HandleRemoteDependencyCanceled(const ObjectID &object_id);
/// Acquire the task lease in the GCS for the given task. This is used to
/// indicate to other nodes that the task is currently pending on this node.
/// The task lease has an expiration time. If we do not renew the lease
/// before that time, then other nodes may choose to execute the task.
void AcquireTaskLease(const TaskID &task_id);
/// The object manager, used to fetch required objects from remote nodes.
ObjectManagerInterface &object_manager_;
/// The reconstruction policy, used to reconstruct required objects that no
/// longer exist on any live nodes.
ReconstructionPolicyInterface &reconstruction_policy_;
/// The event loop, used to set timers for renewing task leases. The task
/// leases are used to indicate which tasks are pending execution on this
/// node and must be periodically renewed.
boost::asio::io_service &io_service_;
/// This node's GCS client ID, used in the task lease information.
const ClientID client_id_;
/// For a given task, the expiration period of the initial task lease that is
/// added to the GCS. The lease expiration period is doubled every time the
/// lease is renewed.
const int64_t initial_lease_period_ms_;
/// A client connection to the GCS.
std::shared_ptr<gcs::GcsClient> gcs_client_;
/// A mapping from task ID of each subscribed task to its list of object
/// dependencies, either task arguments or objects passed into `ray.get`.
std::unordered_map<ray::TaskID, TaskDependencies> task_dependencies_;
@@ -277,7 +236,7 @@ class TaskDependencyManager {
std::unordered_set<ray::ObjectID> local_objects_;
/// The set of tasks that are pending execution. Any objects created by these
/// tasks that are not already local are pending creation.
std::unordered_map<ray::TaskID, PendingTask> pending_tasks_;
std::unordered_set<ray::TaskID> pending_tasks_;
};
} // namespace raylet
+3 -78
View File
@@ -48,60 +48,16 @@ class MockReconstructionPolicy : public ReconstructionPolicyInterface {
MOCK_METHOD1(Cancel, void(const ObjectID &object_id));
};
class MockTaskInfoAccessor : public gcs::RedisTaskInfoAccessor {
public:
MockTaskInfoAccessor(gcs::RedisGcsClient *client)
: gcs::RedisTaskInfoAccessor(client) {}
MOCK_METHOD2(AsyncAddTaskLease,
ray::Status(const std::shared_ptr<TaskLeaseData> &data_ptr,
const gcs::StatusCallback &callback));
};
class MockGcsClient : public gcs::RedisGcsClient {
public:
MockGcsClient(const gcs::GcsClientOptions &options) : gcs::RedisGcsClient(options) {}
void Init(MockTaskInfoAccessor *task_accessor_mock) {
task_accessor_.reset(task_accessor_mock);
}
};
class TaskDependencyManagerTest : public ::testing::Test {
public:
TaskDependencyManagerTest()
: object_manager_mock_(),
reconstruction_policy_mock_(),
io_service_(),
options_("", 1, ""),
gcs_client_mock_(new MockGcsClient(options_)),
task_accessor_mock_(new MockTaskInfoAccessor(gcs_client_mock_.get())),
initial_lease_period_ms_(100),
task_dependency_manager_(object_manager_mock_, reconstruction_policy_mock_,
io_service_, ClientID::Nil(), initial_lease_period_ms_,
gcs_client_mock_) {
gcs_client_mock_->Init(task_accessor_mock_);
}
void Run(uint64_t timeout_ms) {
auto timer_period = boost::posix_time::milliseconds(timeout_ms);
auto timer = std::make_shared<boost::asio::deadline_timer>(io_service_, timer_period);
timer->async_wait([this](const boost::system::error_code &error) {
ASSERT_FALSE(error);
io_service_.stop();
});
io_service_.run();
io_service_.reset();
}
task_dependency_manager_(object_manager_mock_, reconstruction_policy_mock_) {}
protected:
MockObjectManager object_manager_mock_;
MockReconstructionPolicy reconstruction_policy_mock_;
boost::asio::io_service io_service_;
gcs::GcsClientOptions options_;
std::shared_ptr<MockGcsClient> gcs_client_mock_;
MockTaskInfoAccessor *task_accessor_mock_;
int64_t initial_lease_period_ms_;
TaskDependencyManager task_dependency_manager_;
};
@@ -270,9 +226,7 @@ TEST_F(TaskDependencyManagerTest, TestTaskChain) {
ASSERT_FALSE(ready);
}
// Mark each task as pending. A lease entry should be added to the GCS for
// each task.
EXPECT_CALL(*task_accessor_mock_, AsyncAddTaskLease(_, _));
// Mark each task as pending.
task_dependency_manager_.TaskPending(task);
i++;
@@ -323,7 +277,6 @@ TEST_F(TaskDependencyManagerTest, TestDependentPut) {
// it is pending execution.
EXPECT_CALL(object_manager_mock_, CancelPull(put_id));
EXPECT_CALL(reconstruction_policy_mock_, Cancel(put_id));
EXPECT_CALL(*task_accessor_mock_, AsyncAddTaskLease(_, _));
task_dependency_manager_.TaskPending(task1);
}
@@ -336,7 +289,6 @@ TEST_F(TaskDependencyManagerTest, TestTaskForwarding) {
const auto &arguments = task.GetDependencies();
static_cast<void>(task_dependency_manager_.SubscribeGetDependencies(
task.GetTaskSpecification().TaskId(), arguments));
EXPECT_CALL(*task_accessor_mock_, AsyncAddTaskLease(_, _));
task_dependency_manager_.TaskPending(task);
}
@@ -437,31 +389,6 @@ TEST_F(TaskDependencyManagerTest, TestEviction) {
}
}
TEST_F(TaskDependencyManagerTest, TestTaskLeaseRenewal) {
// Mark a task as pending.
auto task = ExampleTask({}, 0);
// We expect an initial call to acquire the lease.
EXPECT_CALL(*task_accessor_mock_, AsyncAddTaskLease(_, _));
task_dependency_manager_.TaskPending(task);
// Check that while the task is still pending, there is one call to renew the
// lease for each lease period that passes. The lease period doubles with
// each renewal.
int num_expected_calls = 4;
int64_t sleep_time = 0;
for (int i = 1; i <= num_expected_calls; i++) {
sleep_time += i * initial_lease_period_ms_;
}
// When sleep_time = 10 * initial_lease_period_ms_, test case fails, because the
// AsyncAddTaskLease function is expected to be called four times, but only three times.
// It's hard to determine the sleep_time value, so let's double it for now.
sleep_time = sleep_time * 2;
EXPECT_CALL(*task_accessor_mock_, AsyncAddTaskLease(_, _))
.Times(testing::AtLeast(num_expected_calls));
Run(sleep_time);
}
TEST_F(TaskDependencyManagerTest, TestRemoveTasksAndRelatedObjects) {
// Create 3 tasks, each dependent on the previous. The first task has no
// arguments.
@@ -478,9 +405,7 @@ TEST_F(TaskDependencyManagerTest, TestRemoveTasksAndRelatedObjects) {
const auto &arguments = task.GetDependencies();
task_dependency_manager_.SubscribeGetDependencies(
task.GetTaskSpecification().TaskId(), arguments);
// Mark each task as pending. A lease entry should be added to the GCS for
// each task.
EXPECT_CALL(*task_accessor_mock_, AsyncAddTaskLease(_, _));
// Mark each task as pending.
task_dependency_manager_.TaskPending(task);
}
+5 -5
View File
@@ -83,7 +83,7 @@ raylet::RayletClient::RayletClient(
const std::string &raylet_socket, const WorkerID &worker_id,
rpc::WorkerType worker_type, const JobID &job_id, const Language &language,
const std::string &ip_address, ClientID *raylet_id, int *port,
std::unordered_map<std::string, std::string> *internal_config,
std::unordered_map<std::string, std::string> *system_config,
const std::string &job_config)
: grpc_client_(std::move(grpc_client)),
worker_id_(worker_id),
@@ -110,12 +110,12 @@ raylet::RayletClient::RayletClient(
*raylet_id = ClientID::FromBinary(reply_message->raylet_id()->str());
*port = reply_message->port();
RAY_CHECK(internal_config);
auto keys = reply_message->internal_config_keys();
auto values = reply_message->internal_config_values();
RAY_CHECK(system_config);
auto keys = reply_message->system_config_keys();
auto values = reply_message->system_config_values();
RAY_CHECK(keys->size() == values->size());
for (size_t i = 0; i < keys->size(); i++) {
internal_config->emplace(keys->Get(i)->str(), values->Get(i)->str());
system_config->emplace(keys->Get(i)->str(), values->Get(i)->str());
}
}
+2 -2
View File
@@ -169,7 +169,7 @@ class RayletClient : public PinObjectsInterface,
/// \param language Language of the worker.
/// \param ip_address The IP address of the worker.
/// \param raylet_id This will be populated with the local raylet's ClientID.
/// \param internal_config This will be populated with internal config parameters
/// \param system_config This will be populated with internal config parameters
/// provided by the raylet.
/// \param port The port that the worker should listen on for gRPC requests. If
/// 0, the worker should choose a random port.
@@ -178,7 +178,7 @@ class RayletClient : public PinObjectsInterface,
const std::string &raylet_socket, const WorkerID &worker_id,
rpc::WorkerType worker_type, const JobID &job_id, const Language &language,
const std::string &ip_address, ClientID *raylet_id, int *port,
std::unordered_map<std::string, std::string> *internal_config,
std::unordered_map<std::string, std::string> *system_config,
const std::string &job_config);
/// Connect to the raylet via grpc only.