diff --git a/.gitignore b/.gitignore index 91189b6f9..05d24aeee 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ /src/ray/gcs/format/*_generated.h /src/ray/object_manager/format/*_generated.h /src/ray/raylet/format/*_generated.h +/java/runtime/src/main/java/org/ray/runtime/generated/* # Modin source files /python/ray/modin diff --git a/java/checkstyle-suppressions.xml b/java/checkstyle-suppressions.xml index 042233225..d7c3b7755 100644 --- a/java/checkstyle-suppressions.xml +++ b/java/checkstyle-suppressions.xml @@ -9,6 +9,5 @@ - - + diff --git a/java/modify_generated_java_flatbuffers_files.py b/java/modify_generated_java_flatbuffers_files.py new file mode 100644 index 000000000..c1b723f25 --- /dev/null +++ b/java/modify_generated_java_flatbuffers_files.py @@ -0,0 +1,55 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import sys + +""" +This script is used for modifying the generated java flatbuffer +files for the reason: The package declaration in Java is different +from python and C++, and there is no option in the flatc command +to specify package(namepsace) for Java specially. + +USAGE: + python modify_generated_java_flatbuffers_file.py RAY_HOME + +RAY_HOME: The root directory of Ray project. +""" + +# constants declarations +PACKAGE_DECLARATION = "package org.ray.runtime.generated;" + + +def add_new_line(file, line_num, text): + with open(file, "r") as file_handler: + lines = file_handler.readlines() + if (line_num <= 0) or (line_num > len(lines) + 1): + return False + + lines.insert(line_num - 1, text + os.linesep) + with open(file, "w") as file_handler: + for line in lines: + file_handler.write(line) + + return True + + +def add_package_declarations(generated_root_path): + file_names = os.listdir(generated_root_path) + for file_name in file_names: + if not file_name.endswith(".java"): + continue + full_name = os.path.join(generated_root_path, file_name) + success = add_new_line(full_name, 2, PACKAGE_DECLARATION) + if not success: + raise RuntimeError("Failed to add package declarations, " + "file name is %s" % full_name) + + +if __name__ == "__main__": + ray_home = sys.argv[1] + root_path = os.path.join( + ray_home, + "java/runtime/src/main/java/org/ray/runtime/generated") + add_package_declarations(root_path) diff --git a/java/runtime/src/main/java/org/ray/runtime/generated/Arg.java b/java/runtime/src/main/java/org/ray/runtime/generated/Arg.java deleted file mode 100644 index 158321052..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/generated/Arg.java +++ /dev/null @@ -1,58 +0,0 @@ -// automatically generated by the FlatBuffers compiler, do not modify -package org.ray.runtime.generated; - -import com.google.flatbuffers.FlatBufferBuilder; -import com.google.flatbuffers.Table; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -@SuppressWarnings("unused") -public final class Arg extends Table { - public static Arg getRootAsArg(ByteBuffer _bb) { return getRootAsArg(_bb, new Arg()); } - public static Arg getRootAsArg(ByteBuffer _bb, Arg obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } - public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; } - public Arg __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } - - public String objectIds(int j) { int o = __offset(4); return o != 0 ? __string(__vector(o) + j * 4) : null; } - public int objectIdsLength() { int o = __offset(4); return o != 0 ? __vector_len(o) : 0; } - public String data() { int o = __offset(6); return o != 0 ? __string(o + bb_pos) : null; } - public ByteBuffer dataAsByteBuffer() { return __vector_as_bytebuffer(6, 1); } - public ByteBuffer dataInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 6, 1); } - - public static int createArg(FlatBufferBuilder builder, - int object_idsOffset, - int dataOffset) { - builder.startObject(2); - Arg.addData(builder, dataOffset); - Arg.addObjectIds(builder, object_idsOffset); - return Arg.endArg(builder); - } - - public static void startArg(FlatBufferBuilder builder) { builder.startObject(2); } - public static void addObjectIds(FlatBufferBuilder builder, int objectIdsOffset) { builder.addOffset(0, objectIdsOffset, 0); } - public static int createObjectIdsVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } - public static void startObjectIdsVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addData(FlatBufferBuilder builder, int dataOffset) { builder.addOffset(1, dataOffset, 0); } - public static int endArg(FlatBufferBuilder builder) { - int o = builder.endObject(); - return o; - } - - //this is manually added to avoid encoding/decoding cost as our object id is a byte array - // instead of a string - public ByteBuffer objectIdAsByteBuffer(int j) { - int o = __offset(4); - if (o == 0) { - return null; - } - - int offset = __vector(o) + j * 4; - offset += bb.getInt(offset); - ByteBuffer src = bb.duplicate().order(ByteOrder.LITTLE_ENDIAN); - int length = src.getInt(offset); - src.position(offset + 4); - src.limit(offset + 4 + length); - return src; - } -} - diff --git a/java/runtime/src/main/java/org/ray/runtime/generated/ClientTableData.java b/java/runtime/src/main/java/org/ray/runtime/generated/ClientTableData.java deleted file mode 100644 index 383c45733..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/generated/ClientTableData.java +++ /dev/null @@ -1,79 +0,0 @@ -package org.ray.runtime.generated; -// automatically generated by the FlatBuffers compiler, do not modify - -import java.nio.*; -import java.lang.*; -import com.google.flatbuffers.*; - -@SuppressWarnings("unused") -public final class ClientTableData extends Table { - public static ClientTableData getRootAsClientTableData(ByteBuffer _bb) { return getRootAsClientTableData(_bb, new ClientTableData()); } - public static ClientTableData getRootAsClientTableData(ByteBuffer _bb, ClientTableData obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } - public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; } - public ClientTableData __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } - - public String clientId() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; } - public ByteBuffer clientIdAsByteBuffer() { return __vector_as_bytebuffer(4, 1); } - public ByteBuffer clientIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 4, 1); } - public String nodeManagerAddress() { int o = __offset(6); return o != 0 ? __string(o + bb_pos) : null; } - public ByteBuffer nodeManagerAddressAsByteBuffer() { return __vector_as_bytebuffer(6, 1); } - public ByteBuffer nodeManagerAddressInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 6, 1); } - public String rayletSocketName() { int o = __offset(8); return o != 0 ? __string(o + bb_pos) : null; } - public ByteBuffer rayletSocketNameAsByteBuffer() { return __vector_as_bytebuffer(8, 1); } - public ByteBuffer rayletSocketNameInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 8, 1); } - public String objectStoreSocketName() { int o = __offset(10); return o != 0 ? __string(o + bb_pos) : null; } - public ByteBuffer objectStoreSocketNameAsByteBuffer() { return __vector_as_bytebuffer(10, 1); } - public ByteBuffer objectStoreSocketNameInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 10, 1); } - public int nodeManagerPort() { int o = __offset(12); return o != 0 ? bb.getInt(o + bb_pos) : 0; } - public int objectManagerPort() { int o = __offset(14); return o != 0 ? bb.getInt(o + bb_pos) : 0; } - public boolean isInsertion() { int o = __offset(16); return o != 0 ? 0!=bb.get(o + bb_pos) : false; } - public String resourcesTotalLabel(int j) { int o = __offset(18); return o != 0 ? __string(__vector(o) + j * 4) : null; } - public int resourcesTotalLabelLength() { int o = __offset(18); return o != 0 ? __vector_len(o) : 0; } - public double resourcesTotalCapacity(int j) { int o = __offset(20); return o != 0 ? bb.getDouble(__vector(o) + j * 8) : 0; } - public int resourcesTotalCapacityLength() { int o = __offset(20); return o != 0 ? __vector_len(o) : 0; } - public ByteBuffer resourcesTotalCapacityAsByteBuffer() { return __vector_as_bytebuffer(20, 8); } - public ByteBuffer resourcesTotalCapacityInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 20, 8); } - - public static int createClientTableData(FlatBufferBuilder builder, - int client_idOffset, - int node_manager_addressOffset, - int raylet_socket_nameOffset, - int object_store_socket_nameOffset, - int node_manager_port, - int object_manager_port, - boolean is_insertion, - int resources_total_labelOffset, - int resources_total_capacityOffset) { - builder.startObject(9); - ClientTableData.addResourcesTotalCapacity(builder, resources_total_capacityOffset); - ClientTableData.addResourcesTotalLabel(builder, resources_total_labelOffset); - ClientTableData.addObjectManagerPort(builder, object_manager_port); - ClientTableData.addNodeManagerPort(builder, node_manager_port); - ClientTableData.addObjectStoreSocketName(builder, object_store_socket_nameOffset); - ClientTableData.addRayletSocketName(builder, raylet_socket_nameOffset); - ClientTableData.addNodeManagerAddress(builder, node_manager_addressOffset); - ClientTableData.addClientId(builder, client_idOffset); - ClientTableData.addIsInsertion(builder, is_insertion); - return ClientTableData.endClientTableData(builder); - } - - public static void startClientTableData(FlatBufferBuilder builder) { builder.startObject(9); } - public static void addClientId(FlatBufferBuilder builder, int clientIdOffset) { builder.addOffset(0, clientIdOffset, 0); } - public static void addNodeManagerAddress(FlatBufferBuilder builder, int nodeManagerAddressOffset) { builder.addOffset(1, nodeManagerAddressOffset, 0); } - public static void addRayletSocketName(FlatBufferBuilder builder, int rayletSocketNameOffset) { builder.addOffset(2, rayletSocketNameOffset, 0); } - public static void addObjectStoreSocketName(FlatBufferBuilder builder, int objectStoreSocketNameOffset) { builder.addOffset(3, objectStoreSocketNameOffset, 0); } - public static void addNodeManagerPort(FlatBufferBuilder builder, int nodeManagerPort) { builder.addInt(4, nodeManagerPort, 0); } - public static void addObjectManagerPort(FlatBufferBuilder builder, int objectManagerPort) { builder.addInt(5, objectManagerPort, 0); } - public static void addIsInsertion(FlatBufferBuilder builder, boolean isInsertion) { builder.addBoolean(6, isInsertion, false); } - public static void addResourcesTotalLabel(FlatBufferBuilder builder, int resourcesTotalLabelOffset) { builder.addOffset(7, resourcesTotalLabelOffset, 0); } - public static int createResourcesTotalLabelVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } - public static void startResourcesTotalLabelVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addResourcesTotalCapacity(FlatBufferBuilder builder, int resourcesTotalCapacityOffset) { builder.addOffset(8, resourcesTotalCapacityOffset, 0); } - public static int createResourcesTotalCapacityVector(FlatBufferBuilder builder, double[] data) { builder.startVector(8, data.length, 8); for (int i = data.length - 1; i >= 0; i--) builder.addDouble(data[i]); return builder.endVector(); } - public static void startResourcesTotalCapacityVector(FlatBufferBuilder builder, int numElems) { builder.startVector(8, numElems, 8); } - public static int endClientTableData(FlatBufferBuilder builder) { - int o = builder.endObject(); - return o; - } -} - diff --git a/java/runtime/src/main/java/org/ray/runtime/generated/Language.java b/java/runtime/src/main/java/org/ray/runtime/generated/Language.java deleted file mode 100644 index 34604374d..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/generated/Language.java +++ /dev/null @@ -1,14 +0,0 @@ -// automatically generated by the FlatBuffers compiler, do not modify - -package org.ray.runtime.generated; - -public final class Language { - private Language() { } - public static final int PYTHON = 0; - public static final int CPP = 1; - public static final int JAVA = 2; - - public static final String[] names = { "PYTHON", "CPP", "JAVA", }; - - public static String name(int e) { return names[e]; } -} diff --git a/java/runtime/src/main/java/org/ray/runtime/generated/ResourcePair.java b/java/runtime/src/main/java/org/ray/runtime/generated/ResourcePair.java deleted file mode 100644 index 5620e221e..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/generated/ResourcePair.java +++ /dev/null @@ -1,38 +0,0 @@ -// automatically generated by the FlatBuffers compiler, do not modify - -package org.ray.runtime.generated; - -import java.nio.*; -import java.lang.*; -import com.google.flatbuffers.*; - -@SuppressWarnings("unused") -public final class ResourcePair extends Table { - public static ResourcePair getRootAsResourcePair(ByteBuffer _bb) { return getRootAsResourcePair(_bb, new ResourcePair()); } - public static ResourcePair getRootAsResourcePair(ByteBuffer _bb, ResourcePair obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } - public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; } - public ResourcePair __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } - - public String key() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; } - public ByteBuffer keyAsByteBuffer() { return __vector_as_bytebuffer(4, 1); } - public ByteBuffer keyInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 4, 1); } - public double value() { int o = __offset(6); return o != 0 ? bb.getDouble(o + bb_pos) : 0.0; } - - public static int createResourcePair(FlatBufferBuilder builder, - int keyOffset, - double value) { - builder.startObject(2); - ResourcePair.addValue(builder, value); - ResourcePair.addKey(builder, keyOffset); - return ResourcePair.endResourcePair(builder); - } - - public static void startResourcePair(FlatBufferBuilder builder) { builder.startObject(2); } - public static void addKey(FlatBufferBuilder builder, int keyOffset) { builder.addOffset(0, keyOffset, 0); } - public static void addValue(FlatBufferBuilder builder, double value) { builder.addDouble(1, value, 0.0); } - public static int endResourcePair(FlatBufferBuilder builder) { - int o = builder.endObject(); - return o; - } -} - diff --git a/java/runtime/src/main/java/org/ray/runtime/generated/TaskInfo.java b/java/runtime/src/main/java/org/ray/runtime/generated/TaskInfo.java deleted file mode 100644 index cf413a913..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/generated/TaskInfo.java +++ /dev/null @@ -1,155 +0,0 @@ -// automatically generated by the FlatBuffers compiler, do not modify -package org.ray.runtime.generated; - -import com.google.flatbuffers.FlatBufferBuilder; -import com.google.flatbuffers.Table; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -@SuppressWarnings("unused") -public final class TaskInfo extends Table { - public static TaskInfo getRootAsTaskInfo(ByteBuffer _bb) { return getRootAsTaskInfo(_bb, new TaskInfo()); } - public static TaskInfo getRootAsTaskInfo(ByteBuffer _bb, TaskInfo obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } - public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; } - public TaskInfo __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } - - public String driverId() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; } - public ByteBuffer driverIdAsByteBuffer() { return __vector_as_bytebuffer(4, 1); } - public ByteBuffer driverIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 4, 1); } - public String taskId() { int o = __offset(6); return o != 0 ? __string(o + bb_pos) : null; } - public ByteBuffer taskIdAsByteBuffer() { return __vector_as_bytebuffer(6, 1); } - public ByteBuffer taskIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 6, 1); } - public String parentTaskId() { int o = __offset(8); return o != 0 ? __string(o + bb_pos) : null; } - public ByteBuffer parentTaskIdAsByteBuffer() { return __vector_as_bytebuffer(8, 1); } - public ByteBuffer parentTaskIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 8, 1); } - public int parentCounter() { int o = __offset(10); return o != 0 ? bb.getInt(o + bb_pos) : 0; } - public String actorCreationId() { int o = __offset(12); return o != 0 ? __string(o + bb_pos) : null; } - public ByteBuffer actorCreationIdAsByteBuffer() { return __vector_as_bytebuffer(12, 1); } - public ByteBuffer actorCreationIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 12, 1); } - public String actorCreationDummyObjectId() { int o = __offset(14); return o != 0 ? __string(o + bb_pos) : null; } - public ByteBuffer actorCreationDummyObjectIdAsByteBuffer() { return __vector_as_bytebuffer(14, 1); } - public ByteBuffer actorCreationDummyObjectIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 14, 1); } - public int maxActorReconstructions() { int o = __offset(16); return o != 0 ? bb.getInt(o + bb_pos) : 0; } - public String actorId() { int o = __offset(18); return o != 0 ? __string(o + bb_pos) : null; } - public ByteBuffer actorIdAsByteBuffer() { return __vector_as_bytebuffer(18, 1); } - public ByteBuffer actorIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 18, 1); } - public String actorHandleId() { int o = __offset(20); return o != 0 ? __string(o + bb_pos) : null; } - public ByteBuffer actorHandleIdAsByteBuffer() { return __vector_as_bytebuffer(20, 1); } - public ByteBuffer actorHandleIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 20, 1); } - public int actorCounter() { int o = __offset(22); return o != 0 ? bb.getInt(o + bb_pos) : 0; } - public boolean isActorCheckpointMethod() { int o = __offset(24); return o != 0 ? 0!=bb.get(o + bb_pos) : false; } - public String newActorHandles(int j) { int o = __offset(26); return o != 0 ? __string(__vector(o) + j * 4) : null; } - public int newActorHandlesLength() { int o = __offset(26); return o != 0 ? __vector_len(o) : 0; } - public Arg args(int j) { return args(new Arg(), j); } - public Arg args(Arg obj, int j) { int o = __offset(28); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } - public int argsLength() { int o = __offset(28); return o != 0 ? __vector_len(o) : 0; } - public String returns(int j) { int o = __offset(30); return o != 0 ? __string(__vector(o) + j * 4) : null; } - public int returnsLength() { int o = __offset(30); return o != 0 ? __vector_len(o) : 0; } - public ResourcePair requiredResources(int j) { return requiredResources(new ResourcePair(), j); } - public ResourcePair requiredResources(ResourcePair obj, int j) { int o = __offset(32); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } - public int requiredResourcesLength() { int o = __offset(32); return o != 0 ? __vector_len(o) : 0; } - public ResourcePair requiredPlacementResources(int j) { return requiredPlacementResources(new ResourcePair(), j); } - public ResourcePair requiredPlacementResources(ResourcePair obj, int j) { int o = __offset(34); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } - public int requiredPlacementResourcesLength() { int o = __offset(34); return o != 0 ? __vector_len(o) : 0; } - public int language() { int o = __offset(36); return o != 0 ? bb.getInt(o + bb_pos) : 0; } - public String functionDescriptor(int j) { int o = __offset(38); return o != 0 ? __string(__vector(o) + j * 4) : null; } - public int functionDescriptorLength() { int o = __offset(38); return o != 0 ? __vector_len(o) : 0; } - - public static int createTaskInfo(FlatBufferBuilder builder, - int driver_idOffset, - int task_idOffset, - int parent_task_idOffset, - int parent_counter, - int actor_creation_idOffset, - int actor_creation_dummy_object_idOffset, - int max_actor_reconstructions, - int actor_idOffset, - int actor_handle_idOffset, - int actor_counter, - boolean is_actor_checkpoint_method, - int new_actor_handlesOffset, - int argsOffset, - int returnsOffset, - int required_resourcesOffset, - int required_placement_resourcesOffset, - int language, - int function_descriptorOffset) { - builder.startObject(18); - TaskInfo.addFunctionDescriptor(builder, function_descriptorOffset); - TaskInfo.addLanguage(builder, language); - TaskInfo.addRequiredPlacementResources(builder, required_placement_resourcesOffset); - TaskInfo.addRequiredResources(builder, required_resourcesOffset); - TaskInfo.addReturns(builder, returnsOffset); - TaskInfo.addArgs(builder, argsOffset); - TaskInfo.addNewActorHandles(builder, new_actor_handlesOffset); - TaskInfo.addActorCounter(builder, actor_counter); - TaskInfo.addActorHandleId(builder, actor_handle_idOffset); - TaskInfo.addActorId(builder, actor_idOffset); - TaskInfo.addMaxActorReconstructions(builder, max_actor_reconstructions); - TaskInfo.addActorCreationDummyObjectId(builder, actor_creation_dummy_object_idOffset); - TaskInfo.addActorCreationId(builder, actor_creation_idOffset); - TaskInfo.addParentCounter(builder, parent_counter); - TaskInfo.addParentTaskId(builder, parent_task_idOffset); - TaskInfo.addTaskId(builder, task_idOffset); - TaskInfo.addDriverId(builder, driver_idOffset); - TaskInfo.addIsActorCheckpointMethod(builder, is_actor_checkpoint_method); - return TaskInfo.endTaskInfo(builder); - } - - public static void startTaskInfo(FlatBufferBuilder builder) { builder.startObject(18); } - public static void addDriverId(FlatBufferBuilder builder, int driverIdOffset) { builder.addOffset(0, driverIdOffset, 0); } - public static void addTaskId(FlatBufferBuilder builder, int taskIdOffset) { builder.addOffset(1, taskIdOffset, 0); } - public static void addParentTaskId(FlatBufferBuilder builder, int parentTaskIdOffset) { builder.addOffset(2, parentTaskIdOffset, 0); } - public static void addParentCounter(FlatBufferBuilder builder, int parentCounter) { builder.addInt(3, parentCounter, 0); } - public static void addActorCreationId(FlatBufferBuilder builder, int actorCreationIdOffset) { builder.addOffset(4, actorCreationIdOffset, 0); } - public static void addActorCreationDummyObjectId(FlatBufferBuilder builder, int actorCreationDummyObjectIdOffset) { builder.addOffset(5, actorCreationDummyObjectIdOffset, 0); } - public static void addMaxActorReconstructions(FlatBufferBuilder builder, int maxActorReconstructions) { builder.addInt(6, maxActorReconstructions, 0); } - public static void addActorId(FlatBufferBuilder builder, int actorIdOffset) { builder.addOffset(7, actorIdOffset, 0); } - public static void addActorHandleId(FlatBufferBuilder builder, int actorHandleIdOffset) { builder.addOffset(8, actorHandleIdOffset, 0); } - public static void addActorCounter(FlatBufferBuilder builder, int actorCounter) { builder.addInt(9, actorCounter, 0); } - public static void addIsActorCheckpointMethod(FlatBufferBuilder builder, boolean isActorCheckpointMethod) { builder.addBoolean(10, isActorCheckpointMethod, false); } - public static void addNewActorHandles(FlatBufferBuilder builder, int newActorHandlesOffset) { builder.addOffset(11, newActorHandlesOffset, 0); } - public static int createNewActorHandlesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } - public static void startNewActorHandlesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addArgs(FlatBufferBuilder builder, int argsOffset) { builder.addOffset(12, argsOffset, 0); } - public static int createArgsVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } - public static void startArgsVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addReturns(FlatBufferBuilder builder, int returnsOffset) { builder.addOffset(13, returnsOffset, 0); } - public static int createReturnsVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } - public static void startReturnsVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addRequiredResources(FlatBufferBuilder builder, int requiredResourcesOffset) { builder.addOffset(14, requiredResourcesOffset, 0); } - public static int createRequiredResourcesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } - public static void startRequiredResourcesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addRequiredPlacementResources(FlatBufferBuilder builder, int requiredPlacementResourcesOffset) { builder.addOffset(15, requiredPlacementResourcesOffset, 0); } - public static int createRequiredPlacementResourcesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } - public static void startRequiredPlacementResourcesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addLanguage(FlatBufferBuilder builder, int language) { builder.addInt(16, language, 0); } - public static void addFunctionDescriptor(FlatBufferBuilder builder, int functionDescriptorOffset) { builder.addOffset(17, functionDescriptorOffset, 0); } - public static int createFunctionDescriptorVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } - public static void startFunctionDescriptorVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static int endTaskInfo(FlatBufferBuilder builder) { - int o = builder.endObject(); - return o; - } - - /** This is manually added to avoid encoding/decoding cost as our object - * id is a byte array instead of a string. - * This function is error-prone. If the fields before `returns` changed, - * the offset number should be changed accordingly. - * TODO(yuhguo): fix this error-prone funciton. - */ - public ByteBuffer returnsAsByteBuffer(int j) { - int o = __offset(30); - if (o == 0) { - return null; - } - - int offset = __vector(o) + j * 4; - offset += bb.getInt(offset); - ByteBuffer src = bb.duplicate().order(ByteOrder.LITTLE_ENDIAN); - int length = src.getInt(offset); - src.position(offset + 4); - src.limit(offset + 4 + length); - return src; - } -} diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java index b804a481f..2a58ea051 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java @@ -146,10 +146,12 @@ public class RayletClientImpl implements RayletClient { FunctionArg[] args = new FunctionArg[info.argsLength()]; for (int i = 0; i < info.argsLength(); i++) { Arg arg = info.args(i); - if (arg.objectIdsLength() > 0) { - Preconditions.checkArgument(arg.objectIdsLength() == 1, - "This arg has more than one id: {}", arg.objectIdsLength()); - UniqueId id = UniqueId.fromByteBuffer(arg.objectIdAsByteBuffer(0)); + + int objectIdsLength = arg.objectIdsAsByteBuffer().remaining() / UniqueId.LENGTH; + if (objectIdsLength > 0) { + Preconditions.checkArgument(objectIdsLength == 1, + "This arg has more than one id: {}", objectIdsLength); + UniqueId id = UniqueIdUtil.getUniqueIdsFromByteBuffer(arg.objectIdsAsByteBuffer())[0]; args[i] = FunctionArg.passByReference(id); } else { ByteBuffer lbb = arg.dataAsByteBuffer(); @@ -160,10 +162,8 @@ public class RayletClientImpl implements RayletClient { } } // Deserialize return ids - UniqueId[] returnIds = new UniqueId[info.returnsLength()]; - for (int i = 0; i < info.returnsLength(); i++) { - returnIds[i] = UniqueId.fromByteBuffer(info.returnsAsByteBuffer(i)); - } + UniqueId[] returnIds = UniqueIdUtil.getUniqueIdsFromByteBuffer(info.returnsAsByteBuffer()); + // Deserialize required resources; Map resources = new HashMap<>(); for (int i = 0; i < info.requiredResourcesLength(); i++) { @@ -175,8 +175,8 @@ public class RayletClientImpl implements RayletClient { info.functionDescriptor(0), info.functionDescriptor(1), info.functionDescriptor(2) ); return new TaskSpec(driverId, taskId, parentTaskId, parentCounter, actorCreationId, - maxActorReconstructions, actorId, actorHandleId, actorCounter, args, returnIds, resources, - functionDescriptor); + maxActorReconstructions, actorId, actorHandleId, actorCounter, + args, returnIds, resources, functionDescriptor); } private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) { @@ -194,22 +194,21 @@ public class RayletClientImpl implements RayletClient { final int actorIdOffset = fbb.createString(task.actorId.toByteBuffer()); final int actorHandleIdOffset = fbb.createString(task.actorHandleId.toByteBuffer()); final int actorCounter = task.actorCounter; + // Serialize the new actor handles. - int[] newActorHandlesOffsets = new int[task.newActorHandles.length]; - for (int i = 0; i < newActorHandlesOffsets.length; i++) { - newActorHandlesOffsets[i] = fbb.createString(task.newActorHandles[i].toByteBuffer()); - } - int newActorHandlesOffset = fbb.createVectorOfTables(newActorHandlesOffsets); + int newActorHandlesOffset + = fbb.createString(UniqueIdUtil.concatUniqueIds(task.newActorHandles)); + // Serialize args int[] argsOffsets = new int[task.args.length]; for (int i = 0; i < argsOffsets.length; i++) { int objectIdOffset = 0; int dataOffset = 0; if (task.args[i].id != null) { - int[] idOffsets = new int[]{fbb.createString(task.args[i].id.toByteBuffer())}; - objectIdOffset = fbb.createVectorOfTables(idOffsets); + objectIdOffset = fbb.createString( + UniqueIdUtil.concatUniqueIds(new UniqueId[] {task.args[i].id})); } else { - objectIdOffset = fbb.createVectorOfTables(new int[0]); + objectIdOffset = fbb.createString(""); } if (task.args[i].data != null) { dataOffset = fbb.createString(ByteBuffer.wrap(task.args[i].data)); @@ -217,13 +216,10 @@ public class RayletClientImpl implements RayletClient { argsOffsets[i] = Arg.createArg(fbb, objectIdOffset, dataOffset); } int argsOffset = fbb.createVectorOfTables(argsOffsets); + // Serialize returns - int returnCount = task.returnIds.length; - int[] returnsOffsets = new int[returnCount]; - for (int k = 0; k < returnCount; k++) { - returnsOffsets[k] = fbb.createString(task.returnIds[k].toByteBuffer()); - } - int returnsOffset = fbb.createVectorOfTables(returnsOffsets); + int returnsOffset = fbb.createString(UniqueIdUtil.concatUniqueIds(task.returnIds)); + // Serialize required resources // The required_resources vector indicates the quantities of the different // resources required by this task. The index in this vector corresponds to diff --git a/java/runtime/src/main/java/org/ray/runtime/task/FunctionArg.java b/java/runtime/src/main/java/org/ray/runtime/task/FunctionArg.java index 9d7502bca..19a16e872 100644 --- a/java/runtime/src/main/java/org/ray/runtime/task/FunctionArg.java +++ b/java/runtime/src/main/java/org/ray/runtime/task/FunctionArg.java @@ -4,7 +4,6 @@ import org.ray.api.id.UniqueId; /** * Represents a function argument in task spec. - * * Either `id` or `data` should be null, when id is not null, this argument will be * passed by reference, otherwise it will be passed by value. */ diff --git a/java/runtime/src/main/java/org/ray/runtime/util/UniqueIdUtil.java b/java/runtime/src/main/java/org/ray/runtime/util/UniqueIdUtil.java index 021279924..009d5a410 100644 --- a/java/runtime/src/main/java/org/ray/runtime/util/UniqueIdUtil.java +++ b/java/runtime/src/main/java/org/ray/runtime/util/UniqueIdUtil.java @@ -1,10 +1,10 @@ package org.ray.runtime.util; +import com.google.common.base.Preconditions; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; import java.util.List; - import org.ray.api.id.UniqueId; @@ -94,4 +94,44 @@ public class UniqueIdUtil { } return ids; } + + /** + * Get unique IDs from concatenated ByteBuffer. + * + * @param byteBufferOfIds The ByteBuffer concatenated from IDs. + * @return The array of unique IDs. + */ + public static UniqueId[] getUniqueIdsFromByteBuffer(ByteBuffer byteBufferOfIds) { + Preconditions.checkArgument(byteBufferOfIds != null); + + byte[] bytesOfIds = new byte[byteBufferOfIds.remaining()]; + byteBufferOfIds.get(bytesOfIds, 0, byteBufferOfIds.remaining()); + + int count = bytesOfIds.length / UniqueId.LENGTH; + UniqueId[] uniqueIds = new UniqueId[count]; + + for (int i = 0; i < count; ++i) { + byte[] id = new byte[UniqueId.LENGTH]; + System.arraycopy(bytesOfIds, i * UniqueId.LENGTH, id, 0, UniqueId.LENGTH); + uniqueIds[i] = UniqueId.fromByteBuffer(ByteBuffer.wrap(id)); + } + + return uniqueIds; + } + + /** + * Concatenate IDs to a ByteBuffer. + * + * @param ids The array of IDs that will be concatenated. + * @return A ByteBuffer that contains bytes of concatenated IDs. + */ + public static ByteBuffer concatUniqueIds(UniqueId[] ids) { + byte[] bytesOfIds = new byte[UniqueId.LENGTH * ids.length]; + for (int i = 0; i < ids.length; ++i) { + System.arraycopy(ids[i].getBytes(), 0, bytesOfIds, + i * UniqueId.LENGTH, UniqueId.LENGTH); + } + + return ByteBuffer.wrap(bytesOfIds); + } } diff --git a/java/test/src/main/java/org/ray/api/test/UniqueIdTest.java b/java/test/src/main/java/org/ray/api/test/UniqueIdTest.java index 2fd47057d..61443ed18 100644 --- a/java/test/src/main/java/org/ray/api/test/UniqueIdTest.java +++ b/java/test/src/main/java/org/ray/api/test/UniqueIdTest.java @@ -2,6 +2,7 @@ package org.ray.api.test; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.List; import javax.xml.bind.DatatypeConverter; import org.junit.Assert; import org.junit.Test; @@ -79,4 +80,20 @@ public class UniqueIdTest { Assert.assertEquals("FCFCFDFE123456789ABCDEF123456789ABCDEF00".toLowerCase(), putId.toString()); } + @Test + public void testUniqueIdsAndByteBufferInterConversion() { + final int len = 5; + UniqueId[] ids = new UniqueId[len]; + for (int i = 0; i < len; ++i) { + ids[i] = UniqueId.randomId(); + } + + ByteBuffer temp = UniqueIdUtil.concatUniqueIds(ids); + UniqueId[] res = UniqueIdUtil.getUniqueIdsFromByteBuffer(temp); + + for (int i = 0; i < len; ++i) { + Assert.assertEquals(ids[i], res[i]); + } + } + } diff --git a/src/ray/common/common_protocol.cc b/src/ray/common/common_protocol.cc index bcbfcc5f0..f5ed40af5 100644 --- a/src/ray/common/common_protocol.cc +++ b/src/ray/common/common_protocol.cc @@ -24,6 +24,32 @@ const std::vector from_flatbuf( return object_ids; } +const std::vector object_ids_from_flatbuf( + const flatbuffers::String &string) { + const auto &object_ids = string_from_flatbuf(string); + std::vector ret; + RAY_CHECK(object_ids.size() % kUniqueIDSize == 0); + auto count = object_ids.size() / kUniqueIDSize; + + for (size_t i = 0; i < count; ++i) { + auto pos = static_cast(kUniqueIDSize * i); + const auto &id = object_ids.substr(pos, kUniqueIDSize); + ret.push_back(ray::ObjectID::from_binary(id)); + } + + return ret; +} + +flatbuffers::Offset object_ids_to_flatbuf( + flatbuffers::FlatBufferBuilder &fbb, const std::vector &object_ids) { + std::string result; + for (const auto &id : object_ids) { + result += id.binary(); + } + + return fbb.CreateString(result); +} + flatbuffers::Offset>> to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, ray::ObjectID object_ids[], int64_t num_objects) { diff --git a/src/ray/common/common_protocol.h b/src/ray/common/common_protocol.h index de8f27fc4..bea4a5b92 100644 --- a/src/ray/common/common_protocol.h +++ b/src/ray/common/common_protocol.h @@ -28,6 +28,23 @@ ray::ObjectID from_flatbuf(const flatbuffers::String &string); const std::vector from_flatbuf( const flatbuffers::Vector> &vector); +/// Convert a flatbuffer of string that concatenated +/// object IDs to a vector of object IDs. +/// +/// @param vector The flatbuffer vector. +/// @return The vector of object IDs. +const std::vector object_ids_from_flatbuf( + const flatbuffers::String &string); + +/// Convert a vector of object IDs to a flatbuffer string. +/// The IDs are concatenated to a string with binary. +/// +/// @param fbb Reference to the flatbuffer builder. +/// @param object_ids The vector of object IDs. +/// @return Flatbuffer string of concatenated IDs. +flatbuffers::Offset object_ids_to_flatbuf( + flatbuffers::FlatBufferBuilder &fbb, const std::vector &object_ids); + /// Convert an array of object IDs to a flatbuffer vector of strings. /// /// @param fbb Reference to the flatbuffer builder. diff --git a/src/ray/gcs/CMakeLists.txt b/src/ray/gcs/CMakeLists.txt index 0fd007737..f09c89f99 100644 --- a/src/ray/gcs/CMakeLists.txt +++ b/src/ray/gcs/CMakeLists.txt @@ -23,14 +23,25 @@ add_custom_command( VERBATIM) add_custom_target(gen_gcs_fbs DEPENDS ${GCS_FBS_OUTPUT_FILES}) +set(RAY_HOME ${CMAKE_CURRENT_LIST_DIR}/../../..) # Generate Python bindings for the flatbuffers objects. -set(PYTHON_OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../python/ray/core/generated/) +set(PYTHON_OUTPUT_DIR ${RAY_HOME}/python/ray/core/generated/) add_custom_command( TARGET gen_gcs_fbs COMMAND ${FLATBUFFERS_COMPILER} -p -o ${PYTHON_OUTPUT_DIR} ${GCS_FBS_SRC} DEPENDS ${FBS_DEPENDS} - COMMENT "Running flatc compiler on ${GCS_FBS_SRC}" + COMMENT "Running flatc compiler on ${GCS_FBS_SRC} for python" + VERBATIM) + +# Generate Java bindings for the flatbuffers objects. +set(JAVA_OUTPUT_DIR ${RAY_HOME}/java/runtime/src/main/java/org/ray/runtime/generated/) +add_custom_command( + TARGET gen_gcs_fbs + COMMAND ${FLATBUFFERS_COMPILER} -j -o ${JAVA_OUTPUT_DIR} ${GCS_FBS_SRC} + COMMAND ${PYTHON_EXECUTABLE} ${RAY_HOME}/java/modify_generated_java_flatbuffers_files.py ${RAY_HOME} + DEPENDS ${FBS_DEPENDS} + COMMENT "Running flatc compiler on ${GCS_FBS_SRC} for Java" VERBATIM) ADD_RAY_TEST(client_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} gtest gtest_main pthread ${Boost_SYSTEM_LIBRARY}) diff --git a/src/ray/gcs/format/gcs.fbs b/src/ray/gcs/format/gcs.fbs index 74f721b6e..6522a334c 100644 --- a/src/ray/gcs/format/gcs.fbs +++ b/src/ray/gcs/format/gcs.fbs @@ -42,7 +42,8 @@ table Arg { // object ID in this list which represents the object that is being passed. // However to support reducers in a MapReduce workload, we also support // passing multiple object IDs for each argument. - object_ids: [string]; + // Note that this is a long string that concatenate all of the object IDs. + object_ids: string; // Data for pass-by-value arguments. data: string; } @@ -76,11 +77,13 @@ table TaskInfo { // If this is an actor task, then this will be populated with all of the new // actor handles that were forked from this handle since the last task on // this handle was submitted. - new_actor_handles: [string]; + // Note that this is a long string that concatenate all of the new_actor_handle IDs. + new_actor_handles: string; // Task arguments. args: [Arg]; - // Object IDs of return values. - returns: [string]; + // Object IDs of return values. This is a long string that concatenate + // all of the return object IDs of this task. + returns: string; // The required_resources vector indicates the quantities of the different // resources required by this task. required_resources: [ResourcePair]; diff --git a/src/ray/raylet/task_spec.cc b/src/ray/raylet/task_spec.cc index 60d6d04e9..5f183c7be 100644 --- a/src/ray/raylet/task_spec.cc +++ b/src/ray/raylet/task_spec.cc @@ -17,7 +17,7 @@ TaskArgumentByReference::TaskArgumentByReference(const std::vector &re flatbuffers::Offset TaskArgumentByReference::ToFlatbuffer( flatbuffers::FlatBufferBuilder &fbb) const { - return CreateArg(fbb, to_flatbuf(fbb, references_)); + return CreateArg(fbb, object_ids_to_flatbuf(fbb, references_)); } TaskArgumentByValue::TaskArgumentByValue(const uint8_t *value, size_t length) { @@ -28,7 +28,7 @@ flatbuffers::Offset TaskArgumentByValue::ToFlatbuffer( flatbuffers::FlatBufferBuilder &fbb) const { auto arg = fbb.CreateString(reinterpret_cast(value_.data()), value_.size()); - auto empty_ids = fbb.CreateVectorOfStrings({}); + const auto &empty_ids = fbb.CreateString(""); return CreateArg(fbb, empty_ids, arg); } @@ -88,11 +88,10 @@ TaskSpecification::TaskSpecification( arguments.push_back(argument->ToFlatbuffer(fbb)); } - // Add return object IDs. - std::vector> returns; - for (int64_t i = 1; i < num_returns + 1; i++) { - ObjectID return_id = ComputeReturnId(task_id, i); - returns.push_back(to_flatbuf(fbb, return_id)); + // Generate return ids. + std::vector returns; + for (int64_t i = 1; i < num_returns + 1; ++i) { + returns.push_back(ComputeReturnId(task_id, i)); } // Serialize the TaskSpecification. @@ -101,8 +100,8 @@ TaskSpecification::TaskSpecification( to_flatbuf(fbb, parent_task_id), parent_counter, to_flatbuf(fbb, actor_creation_id), to_flatbuf(fbb, actor_creation_dummy_object_id), max_actor_reconstructions, to_flatbuf(fbb, actor_id), to_flatbuf(fbb, actor_handle_id), actor_counter, false, - to_flatbuf(fbb, new_actor_handles), fbb.CreateVector(arguments), - fbb.CreateVector(returns), map_to_flatbuf(fbb, required_resources), + object_ids_to_flatbuf(fbb, new_actor_handles), fbb.CreateVector(arguments), + object_ids_to_flatbuf(fbb, returns), map_to_flatbuf(fbb, required_resources), map_to_flatbuf(fbb, required_placement_resources), language, string_vec_to_flatbuf(fbb, function_descriptor)); fbb.Finish(spec); @@ -164,12 +163,12 @@ int64_t TaskSpecification::NumArgs() const { int64_t TaskSpecification::NumReturns() const { auto message = flatbuffers::GetRoot(spec_.data()); - return message->returns()->size(); + return (message->returns()->size() / kUniqueIDSize); } ObjectID TaskSpecification::ReturnId(int64_t return_index) const { auto message = flatbuffers::GetRoot(spec_.data()); - return from_flatbuf(*message->returns()->Get(return_index)); + return object_ids_from_flatbuf(*message->returns())[return_index]; } bool TaskSpecification::ArgByRef(int64_t arg_index) const { @@ -179,12 +178,14 @@ bool TaskSpecification::ArgByRef(int64_t arg_index) const { int TaskSpecification::ArgIdCount(int64_t arg_index) const { auto message = flatbuffers::GetRoot(spec_.data()); auto ids = message->args()->Get(arg_index)->object_ids(); - return ids->size(); + return (ids->size() / kUniqueIDSize); } ObjectID TaskSpecification::ArgId(int64_t arg_index, int64_t id_index) const { auto message = flatbuffers::GetRoot(spec_.data()); - return from_flatbuf(*message->args()->Get(arg_index)->object_ids()->Get(id_index)); + const auto &object_ids = + object_ids_from_flatbuf(*message->args()->Get(arg_index)->object_ids()); + return object_ids[id_index]; } const uint8_t *TaskSpecification::ArgVal(int64_t arg_index) const { @@ -266,7 +267,7 @@ ObjectID TaskSpecification::ActorDummyObject() const { std::vector TaskSpecification::NewActorHandles() const { auto message = flatbuffers::GetRoot(spec_.data()); - return from_flatbuf(*message->new_actor_handles()); + return object_ids_from_flatbuf(*message->new_actor_handles()); } } // namespace raylet