Caching task resource requirements. (#3231)

* caching resource requirements

* small fixes

* avoid copying the resource map
This commit is contained in:
Ion
2018-11-06 01:14:09 +02:00
committed by Philipp Moritz
parent 813f51769f
commit d8ae9de99c
2 changed files with 21 additions and 18 deletions
+17 -17
View File
@@ -32,6 +32,18 @@ flatbuffers::Offset<Arg> TaskArgumentByValue::ToFlatbuffer(
void TaskSpecification::AssignSpecification(const uint8_t *spec, size_t spec_size) {
spec_.assign(spec, spec + spec_size);
// Initialize required_resources_ and required_placement_resources_
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
auto required_resources = map_from_flatbuf(*message->required_resources());
auto required_placement_resources =
map_from_flatbuf(*message->required_placement_resources());
// If the required_placement_resources field is empty, then the placement
// resources default to the required resources.
if (required_placement_resources.size() == 0) {
required_placement_resources = required_resources;
}
required_resources_ = ResourceSet(required_resources);
required_placement_resources_ = ResourceSet(required_placement_resources);
}
TaskSpecification::TaskSpecification(const flatbuffers::String &string) {
@@ -168,30 +180,18 @@ size_t TaskSpecification::ArgValLength(int64_t arg_index) const {
}
double TaskSpecification::GetRequiredResource(const std::string &resource_name) const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
auto required_resources = map_from_flatbuf(*message->required_resources());
auto it = required_resources.find(resource_name);
RAY_CHECK(it != required_resources.end());
RAY_CHECK(required_resources_.GetResourceMap().empty() == false);
auto it = required_resources_.GetResourceMap().find(resource_name);
RAY_CHECK(it != required_resources_.GetResourceMap().end());
return it->second;
}
const ResourceSet TaskSpecification::GetRequiredResources() const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
auto required_resources = map_from_flatbuf(*message->required_resources());
return ResourceSet(required_resources);
return required_resources_;
}
const ResourceSet TaskSpecification::GetRequiredPlacementResources() const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
auto required_placement_resources =
map_from_flatbuf(*message->required_placement_resources());
// If the required_placement_resources field is empty, then the placement
// resources default to the required resources.
if (required_placement_resources.size() == 0) {
required_placement_resources = map_from_flatbuf(*message->required_resources());
}
return ResourceSet(required_placement_resources);
return required_placement_resources_;
}
bool TaskSpecification::IsDriverTask() const {
+4 -1
View File
@@ -204,7 +204,10 @@ class TaskSpecification {
const uint8_t *data() const;
/// Get the size in bytes of the task specification.
size_t size() const;
/// Field storing required resources. Initalized in constructor.
ResourceSet required_resources_;
/// Field storing required placement resources. Initalized in constructor.
ResourceSet required_placement_resources_;
/// The task specification data.
std::vector<uint8_t> spec_;
};