From fa0892f2852b4b2cf46ea33603ea4cd6e410e97a Mon Sep 17 00:00:00 2001 From: Yuhong Guo Date: Tue, 28 May 2019 13:30:41 +0800 Subject: [PATCH] Replace ReturnIds with NumReturns in TaskInfo to reduce the size (#4854) * Refine TaskInfo * Fix * Add a test to print task info size * Lint * Refine --- .../org/ray/runtime/AbstractRayRuntime.java | 2 +- .../ray/runtime/raylet/RayletClientImpl.java | 11 ++-- .../java/org/ray/runtime/task/TaskSpec.java | 16 ++++-- src/ray/gcs/format/gcs.fbs | 5 +- src/ray/raylet/task_spec.cc | 14 ++--- src/ray/raylet/task_test.cc | 51 +++++++++++++++++++ 6 files changed, 74 insertions(+), 25 deletions(-) diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index 01f8dbd12..fbd03bf10 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -390,7 +390,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { actor.increaseTaskCounter(), actor.getNewActorHandles().toArray(new UniqueId[0]), ArgumentsBuilder.wrap(args, language == TaskLanguage.PYTHON), - returnIds, + numReturns, resources, language, functionDescriptor diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java index b4bfa5a7f..01b9e4675 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java @@ -154,6 +154,7 @@ public class RayletClientImpl implements RayletClient { UniqueId actorId = UniqueId.fromByteBuffer(info.actorIdAsByteBuffer()); UniqueId actorHandleId = UniqueId.fromByteBuffer(info.actorHandleIdAsByteBuffer()); int actorCounter = info.actorCounter(); + int numReturns = info.numReturns(); // Deserialize new actor handles UniqueId[] newActorHandles = IdUtil.getUniqueIdsFromByteBuffer( @@ -177,8 +178,6 @@ public class RayletClientImpl implements RayletClient { args[i] = FunctionArg.passByValue(data); } } - // Deserialize return ids - ObjectId[] returnIds = IdUtil.getObjectIdsFromByteBuffer(info.returnsAsByteBuffer()); // Deserialize required resources; Map resources = new HashMap<>(); @@ -193,7 +192,7 @@ public class RayletClientImpl implements RayletClient { ); return new TaskSpec(driverId, taskId, parentTaskId, parentCounter, actorCreationId, maxActorReconstructions, actorId, actorHandleId, actorCounter, newActorHandles, - args, returnIds, resources, TaskLanguage.JAVA, functionDescriptor); + args, numReturns, resources, TaskLanguage.JAVA, functionDescriptor); } private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) { @@ -211,6 +210,7 @@ public class RayletClientImpl implements RayletClient { final int actorIdOffset = fbb.createString(task.actorId.toByteBuffer()); final int actorHandleIdOffset = fbb.createString(task.actorHandleId.toByteBuffer()); final int actorCounter = task.actorCounter; + final int numReturnsOffset = task.numReturns; // Serialize the new actor handles. int newActorHandlesOffset @@ -234,9 +234,6 @@ public class RayletClientImpl implements RayletClient { } int argsOffset = fbb.createVectorOfTables(argsOffsets); - // Serialize returns - int returnsOffset = fbb.createString(IdUtil.concatIds(task.returnIds)); - // Serialize required resources // The required_resources vector indicates the quantities of the different // resources required by this task. The index in this vector corresponds to @@ -292,7 +289,7 @@ public class RayletClientImpl implements RayletClient { actorCounter, newActorHandlesOffset, argsOffset, - returnsOffset, + numReturnsOffset, requiredResourcesOffset, requiredPlacementResourcesOffset, language, diff --git a/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java b/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java index 8a98e11c6..3473a9bdb 100644 --- a/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java +++ b/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java @@ -11,6 +11,7 @@ import org.ray.api.id.UniqueId; import org.ray.runtime.functionmanager.FunctionDescriptor; import org.ray.runtime.functionmanager.JavaFunctionDescriptor; import org.ray.runtime.functionmanager.PyFunctionDescriptor; +import org.ray.runtime.util.IdUtil; /** * Represents necessary information of a task for scheduling and executing. @@ -50,7 +51,10 @@ public class TaskSpec { // Task arguments. public final FunctionArg[] args; - // return ids + // number of return objects. + public final int numReturns; + + // returns ids. public final ObjectId[] returnIds; // The task's resource demands. @@ -86,7 +90,7 @@ public class TaskSpec { int actorCounter, UniqueId[] newActorHandles, FunctionArg[] args, - ObjectId[] returnIds, + int numReturns, Map resources, TaskLanguage language, FunctionDescriptor functionDescriptor) { @@ -101,7 +105,11 @@ public class TaskSpec { this.actorCounter = actorCounter; this.newActorHandles = newActorHandles; this.args = args; - this.returnIds = returnIds; + this.numReturns = numReturns; + returnIds = new ObjectId[numReturns]; + for (int i = 0; i < numReturns; ++i) { + returnIds[i] = IdUtil.computeReturnId(taskId, i + 1); + } this.resources = resources; this.language = language; if (language == TaskLanguage.JAVA) { @@ -145,7 +153,7 @@ public class TaskSpec { ", actorCounter=" + actorCounter + ", newActorHandles=" + Arrays.toString(newActorHandles) + ", args=" + Arrays.toString(args) + - ", returnIds=" + Arrays.toString(returnIds) + + ", numReturns=" + numReturns + ", resources=" + resources + ", language=" + language + ", functionDescriptor=" + functionDescriptor + diff --git a/src/ray/gcs/format/gcs.fbs b/src/ray/gcs/format/gcs.fbs index 7cf250247..b81f388d8 100644 --- a/src/ray/gcs/format/gcs.fbs +++ b/src/ray/gcs/format/gcs.fbs @@ -89,9 +89,8 @@ table TaskInfo { new_actor_handles: string; // Task arguments. args: [Arg]; - // Object IDs of return values. This is a long string that concatenate - // all of the return object IDs of this task. - returns: string; + // Number of return objects. + num_returns: int; // The required_resources vector indicates the quantities of the different // resources required by this task. required_resources: [ResourcePair]; diff --git a/src/ray/raylet/task_spec.cc b/src/ray/raylet/task_spec.cc index d4ec4f5c5..17a8b185f 100644 --- a/src/ray/raylet/task_spec.cc +++ b/src/ray/raylet/task_spec.cc @@ -92,20 +92,14 @@ TaskSpecification::TaskSpecification( arguments.push_back(argument->ToFlatbuffer(fbb)); } - // Generate return ids. - std::vector returns; - for (int64_t i = 1; i < num_returns + 1; ++i) { - returns.push_back(ObjectID::for_task_return(task_id, i)); - } - // Serialize the TaskSpecification. auto spec = CreateTaskInfo( fbb, to_flatbuf(fbb, driver_id), to_flatbuf(fbb, task_id), to_flatbuf(fbb, parent_task_id), parent_counter, to_flatbuf(fbb, actor_creation_id), to_flatbuf(fbb, actor_creation_dummy_object_id), max_actor_reconstructions, to_flatbuf(fbb, actor_id), to_flatbuf(fbb, actor_handle_id), actor_counter, - ids_to_flatbuf(fbb, new_actor_handles), fbb.CreateVector(arguments), - ids_to_flatbuf(fbb, returns), map_to_flatbuf(fbb, required_resources), + ids_to_flatbuf(fbb, new_actor_handles), fbb.CreateVector(arguments), num_returns, + map_to_flatbuf(fbb, required_resources), map_to_flatbuf(fbb, required_placement_resources), language, string_vec_to_flatbuf(fbb, function_descriptor)); fbb.Finish(spec); @@ -167,12 +161,12 @@ int64_t TaskSpecification::NumArgs() const { int64_t TaskSpecification::NumReturns() const { auto message = flatbuffers::GetRoot(spec_.data()); - return (message->returns()->size() / kUniqueIDSize); + return message->num_returns(); } ObjectID TaskSpecification::ReturnId(int64_t return_index) const { auto message = flatbuffers::GetRoot(spec_.data()); - return ids_from_flatbuf(*message->returns())[return_index]; + return ObjectID::for_task_return(TaskId(), return_index + 1); } bool TaskSpecification::ArgByRef(int64_t arg_index) const { diff --git a/src/ray/raylet/task_test.cc b/src/ray/raylet/task_test.cc index 03a4caff1..6d0cfa370 100644 --- a/src/ray/raylet/task_test.cc +++ b/src/ray/raylet/task_test.cc @@ -1,5 +1,6 @@ #include "gtest/gtest.h" +#include "ray/common/common_protocol.h" #include "ray/raylet/task_spec.h" namespace ray { @@ -47,6 +48,56 @@ TEST(IdPropertyTest, TestIdProperty) { ASSERT_TRUE(ObjectID::nil().is_nil()); } +TEST(TaskSpecTest, TaskInfoSize) { + std::vector references = {ObjectID::from_random(), ObjectID::from_random()}; + auto arguments_1 = std::make_shared(references); + std::string one_arg("This is an value argument."); + auto arguments_2 = std::make_shared( + reinterpret_cast(one_arg.c_str()), one_arg.size()); + std::vector> task_arguments({arguments_1, arguments_2}); + auto task_id = TaskID::from_random(); + { + flatbuffers::FlatBufferBuilder fbb; + std::vector> arguments; + for (auto &argument : task_arguments) { + arguments.push_back(argument->ToFlatbuffer(fbb)); + } + // General task. + auto spec = CreateTaskInfo( + fbb, to_flatbuf(fbb, DriverID::from_random()), to_flatbuf(fbb, task_id), + to_flatbuf(fbb, TaskID::from_random()), 0, to_flatbuf(fbb, ActorID::nil()), + to_flatbuf(fbb, ObjectID::nil()), 0, to_flatbuf(fbb, ActorID::nil()), + to_flatbuf(fbb, ActorHandleID::nil()), 0, + ids_to_flatbuf(fbb, std::vector()), fbb.CreateVector(arguments), 1, + map_to_flatbuf(fbb, {}), map_to_flatbuf(fbb, {}), Language::PYTHON, + string_vec_to_flatbuf(fbb, {"PackageName", "ClassName", "FunctionName"})); + fbb.Finish(spec); + RAY_LOG(ERROR) << "Ordinary task info size: " << fbb.GetSize(); + } + + { + flatbuffers::FlatBufferBuilder fbb; + std::vector> arguments; + for (auto &argument : task_arguments) { + arguments.push_back(argument->ToFlatbuffer(fbb)); + } + // General task. + auto spec = CreateTaskInfo( + fbb, to_flatbuf(fbb, DriverID::from_random()), to_flatbuf(fbb, task_id), + to_flatbuf(fbb, TaskID::from_random()), 10, + to_flatbuf(fbb, ActorID::from_random()), to_flatbuf(fbb, ObjectID::from_random()), + 10000000, to_flatbuf(fbb, ActorID::from_random()), + to_flatbuf(fbb, ActorHandleID::from_random()), 20, + ids_to_flatbuf(fbb, std::vector( + {ObjectID::from_random(), ObjectID::from_random()})), + fbb.CreateVector(arguments), 2, map_to_flatbuf(fbb, {}), map_to_flatbuf(fbb, {}), + Language::PYTHON, + string_vec_to_flatbuf(fbb, {"PackageName", "ClassName", "FunctionName"})); + fbb.Finish(spec); + RAY_LOG(ERROR) << "Actor task info size: " << fbb.GetSize(); + } +} + } // namespace raylet } // namespace ray