diff --git a/BUILD.bazel b/BUILD.bazel index 69db22310..0114b89a9 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1496,6 +1496,7 @@ cc_binary( linkstatic = 1, deps = [ "//:core_worker_lib", + "//:global_state_accessor_lib", "//:src/ray/ray_exported_symbols.lds", "//:src/ray/ray_version_script.lds", "//streaming:jni", diff --git a/java/generate_jni_header_files.sh b/java/generate_jni_header_files.sh index 7e8c73e1b..55bca4bce 100755 --- a/java/generate_jni_header_files.sh +++ b/java/generate_jni_header_files.sh @@ -33,13 +33,14 @@ EOF rm -f $file } -generate_one org.ray.runtime.RayNativeRuntime -generate_one org.ray.runtime.task.NativeTaskSubmitter -generate_one org.ray.runtime.context.NativeWorkerContext -generate_one org.ray.runtime.actor.NativeRayActor -generate_one org.ray.runtime.object.NativeObjectStore -generate_one org.ray.runtime.task.NativeTaskExecutor +generate_one io.ray.runtime.RayNativeRuntime +generate_one io.ray.runtime.task.NativeTaskSubmitter +generate_one io.ray.runtime.context.NativeWorkerContext +generate_one io.ray.runtime.actor.NativeRayActor +generate_one io.ray.runtime.object.NativeObjectStore +generate_one io.ray.runtime.task.NativeTaskExecutor +generate_one io.ray.runtime.gcs.GlobalStateAccessor # Remove empty files -rm -f org_ray_runtime_RayNativeRuntime_AsyncContext.h -rm -f org_ray_runtime_task_NativeTaskExecutor_NativeActorContext.h \ No newline at end of file +rm -f io_ray_runtime_RayNativeRuntime_AsyncContext.h +rm -f io_ray_runtime_task_NativeTaskExecutor_NativeActorContext.h \ No newline at end of file diff --git a/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java b/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java new file mode 100644 index 000000000..c7efe94db --- /dev/null +++ b/java/runtime/src/main/java/io/ray/runtime/gcs/GlobalStateAccessor.java @@ -0,0 +1,85 @@ +package io.ray.runtime.gcs; + +import com.google.common.base.Preconditions; +import java.util.List; + +/** + * `GlobalStateAccessor` is used for accessing information from GCS. + * + **/ +public class GlobalStateAccessor { + // NOTE(lingxuan.zlx): this is a singleton, it can not be changed during a Ray session. + // Native pointer to the C++ GcsStateAccessor. + private Long globalStateAccessorNativePointer = 0L; + private static GlobalStateAccessor globalStateAccessor; + + public static synchronized GlobalStateAccessor getInstance(String redisAddress, + String redisPassword) { + if (null == globalStateAccessor) { + globalStateAccessor = new GlobalStateAccessor(redisAddress, redisPassword); + } + return globalStateAccessor; + } + + public static synchronized void destroyInstance() { + if (null != globalStateAccessor) { + globalStateAccessor.destroyGlobalStateAccessor(); + } + } + + private GlobalStateAccessor(String redisAddress, String redisPassword) { + globalStateAccessorNativePointer = + nativeCreateGlobalStateAccessor(redisAddress, redisPassword); + Preconditions.checkState(globalStateAccessorNativePointer != 0, + "Global state accessor native pointer must not be 0."); + connect(); + } + + private boolean connect() { + return this.nativeConnect(globalStateAccessorNativePointer); + } + + /** + * @return A list of job info with JobInfo protobuf schema. + */ + public List getAllJobInfo() { + // Fetch a job list with protobuf bytes format from GCS. + synchronized (GlobalStateAccessor.class) { + Preconditions.checkState(globalStateAccessorNativePointer != 0); + return this.nativeGetAllJobInfo(globalStateAccessorNativePointer); + } + } + + /** + * @return A list of node info with GcsNodeInfo protobuf schema. + */ + public List getAllNodeInfo() { + // Fetch a node list with protobuf bytes format from GCS. + synchronized (GlobalStateAccessor.class) { + Preconditions.checkState(globalStateAccessorNativePointer != 0); + return this.nativeGetAllNodeInfo(globalStateAccessorNativePointer); + } + } + + private void destroyGlobalStateAccessor() { + synchronized (GlobalStateAccessor.class) { + if (0 == globalStateAccessorNativePointer) { + return; + } + this.nativeDestroyGlobalStateAccessor(globalStateAccessorNativePointer); + globalStateAccessorNativePointer = 0L; + } + } + + private native long nativeCreateGlobalStateAccessor(String redisAddress, String redisPassword); + + private native void nativeDestroyGlobalStateAccessor(long nativePtr); + + private native boolean nativeConnect(long nativePtr); + + private native void nativeDisconnect(long nativePtr); + + private native List nativeGetAllJobInfo(long nativePtr); + + private native List nativeGetAllNodeInfo(long nativePtr); +} diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h index 4930a2052..1b9e7d006 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h @@ -25,7 +25,7 @@ extern "C" { * Class: io_ray_runtime_RayNativeRuntime * Method: nativeInitialize * Signature: - * (ILjava/lang/String;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;[BLorg/ray/runtime/gcs/GcsClientOptions;ILjava/lang/String;Ljava/util/Map;)V + * (ILjava/lang/String;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;[BLio/ray/runtime/gcs/GcsClientOptions;ILjava/lang/String;Ljava/util/Map;)V */ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize( JNIEnv *, jclass, jint, jstring, jint, jstring, jstring, jstring, jbyteArray, jobject, @@ -34,7 +34,7 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize( /* * Class: io_ray_runtime_RayNativeRuntime * Method: nativeRunTaskExecutor - * Signature: (Lorg/ray/runtime/task/TaskExecutor;)V + * Signature: (Lio/ray/runtime/task/TaskExecutor;)V */ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeRunTaskExecutor(JNIEnv *, jclass, jobject); @@ -45,7 +45,7 @@ Java_io_ray_runtime_RayNativeRuntime_nativeRunTaskExecutor(JNIEnv *, jclass, job * Signature: ()V */ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeShutdown(JNIEnv *, - jclass); + jclass); /* * Class: io_ray_runtime_RayNativeRuntime @@ -61,9 +61,9 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeSetResource( * Signature: ([BZ)V */ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeKillActor(JNIEnv *, - jclass, - jbyteArray, - jboolean); + jclass, + jbyteArray, + jboolean); /* * Class: io_ray_runtime_RayNativeRuntime diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_context_NativeWorkerContext.cc b/src/ray/core_worker/lib/java/io_ray_runtime_context_NativeWorkerContext.cc index e83dbba09..8107b4746 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_context_NativeWorkerContext.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_context_NativeWorkerContext.cc @@ -25,7 +25,7 @@ extern "C" { JNIEXPORT jint JNICALL Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentTaskType(JNIEnv *env, - jclass) { + jclass) { auto task_spec = ray::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentTask(); RAY_CHECK(task_spec) << "Current task is not set."; @@ -34,7 +34,7 @@ Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentTaskType(JNIEnv JNIEXPORT jobject JNICALL Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentTaskId(JNIEnv *env, - jclass) { + jclass) { const ray::TaskID &task_id = ray::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentTaskID(); return IdToJavaByteBuffer(env, task_id); @@ -42,7 +42,7 @@ Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentTaskId(JNIEnv *e JNIEXPORT jobject JNICALL Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentJobId(JNIEnv *env, - jclass) { + jclass) { const auto &job_id = ray::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentJobID(); return IdToJavaByteBuffer(env, job_id); @@ -50,7 +50,7 @@ Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentJobId(JNIEnv *en JNIEXPORT jobject JNICALL Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentWorkerId(JNIEnv *env, - jclass) { + jclass) { const auto &worker_id = ray::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetWorkerID(); return IdToJavaByteBuffer(env, worker_id); @@ -58,7 +58,7 @@ Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentWorkerId(JNIEnv JNIEXPORT jobject JNICALL Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentActorId(JNIEnv *env, - jclass) { + jclass) { const auto &actor_id = ray::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentActorID(); return IdToJavaByteBuffer(env, actor_id); diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_context_NativeWorkerContext.h b/src/ray/core_worker/lib/java/io_ray_runtime_context_NativeWorkerContext.h index f5476aebf..cb6005533 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_context_NativeWorkerContext.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_context_NativeWorkerContext.h @@ -28,7 +28,7 @@ extern "C" { */ JNIEXPORT jint JNICALL Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentTaskType(JNIEnv *, - jclass); + jclass); /* * Class: io_ray_runtime_context_NativeWorkerContext @@ -53,7 +53,7 @@ Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentJobId(JNIEnv *, */ JNIEXPORT jobject JNICALL Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentWorkerId(JNIEnv *, - jclass); + jclass); /* * Class: io_ray_runtime_context_NativeWorkerContext @@ -61,8 +61,7 @@ Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentWorkerId(JNIEnv * Signature: ()Ljava/nio/ByteBuffer; */ JNIEXPORT jobject JNICALL -Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentActorId(JNIEnv *, - jclass); +Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentActorId(JNIEnv *, jclass); #ifdef __cplusplus } diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc new file mode 100644 index 000000000..6e0ed6cb1 --- /dev/null +++ b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.cc @@ -0,0 +1,81 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h" +#include +#include "ray/core_worker/common.h" +#include "ray/core_worker/lib/java/jni_utils.h" +#include "ray/gcs/gcs_client/global_state_accessor.h" + +#ifdef __cplusplus +extern "C" { +#endif +JNIEXPORT jlong JNICALL +Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeCreateGlobalStateAccessor( + JNIEnv *env, jobject o, jstring j_redis_address, jstring j_redis_passowrd) { + std::string redis_address = JavaStringToNativeString(env, j_redis_address); + std::string redis_password = JavaStringToNativeString(env, j_redis_passowrd); + ray::gcs::GlobalStateAccessor *gcs_accessor = + new ray::gcs::GlobalStateAccessor(redis_address, redis_password); + return reinterpret_cast(gcs_accessor); +} + +JNIEXPORT void JNICALL +Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeDestroyGlobalStateAccessor( + JNIEnv *env, jobject o, jlong gcs_accessor_ptr) { + auto *gcs_accessor = + reinterpret_cast(gcs_accessor_ptr); + delete gcs_accessor; +} + +JNIEXPORT jboolean JNICALL Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeConnect( + JNIEnv *env, jobject o, jlong gcs_accessor_ptr) { + auto *gcs_accessor = + reinterpret_cast(gcs_accessor_ptr); + return gcs_accessor->Connect(); +} + +JNIEXPORT void JNICALL Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeDisconnect( + JNIEnv *env, jobject o, jlong gcs_accessor_ptr) { + auto *gcs_accessor = + reinterpret_cast(gcs_accessor_ptr); + gcs_accessor->Disconnect(); +} + +JNIEXPORT jobject JNICALL Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllJobInfo( + JNIEnv *env, jobject o, jlong gcs_accessor_ptr) { + auto *gcs_accessor = + reinterpret_cast(gcs_accessor_ptr); + auto job_info_list = gcs_accessor->GetAllJobInfo(); + return NativeVectorToJavaList( + env, job_info_list, [](JNIEnv *env, const std::string &str) { + return NativeStringToJavaByteArray(env, str); + }); +} + +JNIEXPORT jobject JNICALL +Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllNodeInfo(JNIEnv *env, jobject o, + jlong gcs_accessor_ptr) { + auto *gcs_accessor = + reinterpret_cast(gcs_accessor_ptr); + auto node_info_list = gcs_accessor->GetAllNodeInfo(); + return NativeVectorToJavaList( + env, node_info_list, [](JNIEnv *env, const std::string &str) { + return NativeStringToJavaByteArray(env, str); + }); +} + +#ifdef __cplusplus +} +#endif diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h new file mode 100644 index 000000000..3d9c60c3b --- /dev/null +++ b/src/ray/core_worker/lib/java/io_ray_runtime_gcs_GlobalStateAccessor.h @@ -0,0 +1,81 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class io_ray_runtime_gcs_GlobalStateAccessor */ + +#ifndef _Included_io_ray_runtime_gcs_GlobalStateAccessor +#define _Included_io_ray_runtime_gcs_GlobalStateAccessor +#ifdef __cplusplus +extern "C" { +#endif +/* + * Class: io_ray_runtime_gcs_GlobalStateAccessor + * Method: nativeCreateGlobalStateAccessor + * Signature: (Ljava/lang/String;Ljava/lang/String;)J + */ +JNIEXPORT jlong JNICALL +Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeCreateGlobalStateAccessor(JNIEnv *, + jobject, + jstring, + jstring); + +/* + * Class: io_ray_runtime_gcs_GlobalStateAccessor + * Method: nativeDestroyGlobalStateAccessor + * Signature: (J)V + */ +JNIEXPORT void JNICALL +Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeDestroyGlobalStateAccessor(JNIEnv *, + jobject, + jlong); + +/* + * Class: io_ray_runtime_gcs_GlobalStateAccessor + * Method: nativeConnect + * Signature: (J)Z + */ +JNIEXPORT jboolean JNICALL +Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeConnect(JNIEnv *, jobject, jlong); + +/* + * Class: io_ray_runtime_gcs_GlobalStateAccessor + * Method: nativeDisconnect + * Signature: (J)V + */ +JNIEXPORT void JNICALL +Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeDisconnect(JNIEnv *, jobject, jlong); + +/* + * Class: io_ray_runtime_gcs_GlobalStateAccessor + * Method: nativeGetAllJobInfo + * Signature: (J)Ljava/util/List; + */ +JNIEXPORT jobject JNICALL +Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllJobInfo(JNIEnv *, jobject, jlong); + +/* + * Class: io_ray_runtime_gcs_GlobalStateAccessor + * Method: nativeGetAllNodeInfo + * Signature: (J)Ljava/util/List; + */ +JNIEXPORT jobject JNICALL +Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllNodeInfo(JNIEnv *, jobject, + jlong); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h index 31fe70b23..183ccc4fa 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h @@ -24,7 +24,7 @@ extern "C" { /* * Class: io_ray_runtime_object_NativeObjectStore * Method: nativePut - * Signature: (Lorg/ray/runtime/object/NativeRayObject;)[B + * Signature: (Lio/ray/runtime/object/NativeRayObject;)[B */ JNIEXPORT jbyteArray JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativePut__Lio_ray_runtime_object_NativeRayObject_2( @@ -33,7 +33,7 @@ Java_io_ray_runtime_object_NativeObjectStore_nativePut__Lio_ray_runtime_object_N /* * Class: io_ray_runtime_object_NativeObjectStore * Method: nativePut - * Signature: ([BLorg/ray/runtime/object/NativeRayObject;)V + * Signature: ([BLio/ray/runtime/object/NativeRayObject;)V */ JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativePut___3BLio_ray_runtime_object_NativeRayObject_2( @@ -44,8 +44,10 @@ Java_io_ray_runtime_object_NativeObjectStore_nativePut___3BLio_ray_runtime_objec * Method: nativeGet * Signature: (Ljava/util/List;J)Ljava/util/List; */ -JNIEXPORT jobject JNICALL -Java_io_ray_runtime_object_NativeObjectStore_nativeGet(JNIEnv *, jclass, jobject, jlong); +JNIEXPORT jobject JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeGet(JNIEnv *, + jclass, + jobject, + jlong); /* * Class: io_ray_runtime_object_NativeObjectStore diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskExecutor.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskExecutor.cc index d0d05bb88..fabc01fe1 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskExecutor.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskExecutor.cc @@ -27,8 +27,7 @@ extern "C" { using ray::ClientID; JNIEXPORT jbyteArray JNICALL -Java_io_ray_runtime_task_NativeTaskExecutor_nativePrepareCheckpoint(JNIEnv *env, - jclass) { +Java_io_ray_runtime_task_NativeTaskExecutor_nativePrepareCheckpoint(JNIEnv *env, jclass) { auto &core_worker = ray::CoreWorkerProcess::GetCoreWorker(); const auto &actor_id = core_worker.GetWorkerContext().GetCurrentActorID(); const auto &task_spec = core_worker.GetWorkerContext().GetCurrentTask(); diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.h b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.h index 26dfab594..c657ae323 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.h @@ -25,7 +25,7 @@ extern "C" { * Class: io_ray_runtime_task_NativeTaskSubmitter * Method: nativeSubmitTask * Signature: - * (Lorg/ray/runtime/functionmanager/FunctionDescriptor;Ljava/util/List;ILorg/ray/api/options/CallOptions;)Ljava/util/List; + * (Lio/ray/runtime/functionmanager/FunctionDescriptor;Ljava/util/List;ILio/ray/api/options/CallOptions;)Ljava/util/List; */ JNIEXPORT jobject JNICALL Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitTask( JNIEnv *, jclass, jobject, jobject, jint, jobject); @@ -34,23 +34,23 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSub * Class: io_ray_runtime_task_NativeTaskSubmitter * Method: nativeCreateActor * Signature: - * (Lorg/ray/runtime/functionmanager/FunctionDescriptor;Ljava/util/List;Lorg/ray/api/options/ActorCreationOptions;)[B + * (Lio/ray/runtime/functionmanager/FunctionDescriptor;Ljava/util/List;Lio/ray/api/options/ActorCreationOptions;)[B */ JNIEXPORT jbyteArray JNICALL Java_io_ray_runtime_task_NativeTaskSubmitter_nativeCreateActor(JNIEnv *, jclass, jobject, - jobject, jobject); + jobject, jobject); /* * Class: io_ray_runtime_task_NativeTaskSubmitter * Method: nativeSubmitActorTask * Signature: - * ([BLorg/ray/runtime/functionmanager/FunctionDescriptor;Ljava/util/List;ILorg/ray/api/options/CallOptions;)Ljava/util/List; + * ([BLio/ray/runtime/functionmanager/FunctionDescriptor;Ljava/util/List;ILio/ray/api/options/CallOptions;)Ljava/util/List; */ JNIEXPORT jobject JNICALL Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask(JNIEnv *, jclass, - jbyteArray, jobject, - jobject, jint, - jobject); + jbyteArray, jobject, + jobject, jint, + jobject); #ifdef __cplusplus } diff --git a/src/ray/core_worker/lib/java/jni_utils.h b/src/ray/core_worker/lib/java/jni_utils.h index d7a6e9463..5e76c2fd5 100644 --- a/src/ray/core_worker/lib/java/jni_utils.h +++ b/src/ray/core_worker/lib/java/jni_utils.h @@ -232,6 +232,14 @@ inline jobject IdToJavaByteBuffer(JNIEnv *env, const ID &id) { reinterpret_cast(const_cast(id.Data())), id.Size()); } +/// Convert C++ String to a Java ByteArray. +inline jobject NativeStringToJavaByteArray(JNIEnv *env, const std::string &str) { + jbyteArray array = env->NewByteArray(str.size()); + env->SetByteArrayRegion(array, 0, str.size(), + reinterpret_cast(str.c_str())); + return array; +} + /// Convert a Java String to C++ std::string. inline std::string JavaStringToNativeString(JNIEnv *env, jstring jstr) { const char *c_str = env->GetStringUTFChars(jstr, nullptr); diff --git a/src/ray/gcs/gcs_client/global_state_accessor.cc b/src/ray/gcs/gcs_client/global_state_accessor.cc index 1ee4cee2a..d1247574a 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.cc +++ b/src/ray/gcs/gcs_client/global_state_accessor.cc @@ -53,8 +53,13 @@ GlobalStateAccessor::~GlobalStateAccessor() { } bool GlobalStateAccessor::Connect() { - is_connected_ = true; - return gcs_client_->Connect(*io_service_).ok(); + if (!is_connected_) { + is_connected_ = true; + return gcs_client_->Connect(*io_service_).ok(); + } else { + RAY_LOG(DEBUG) << "Duplicated connection for GlobalStateAccessor."; + return true; + } } void GlobalStateAccessor::Disconnect() { @@ -67,32 +72,26 @@ void GlobalStateAccessor::Disconnect() { std::vector GlobalStateAccessor::GetAllJobInfo() { std::vector job_table_data; std::promise promise; - auto on_done = [&job_table_data, &promise]( - const Status &status, const std::vector &result) { - RAY_CHECK_OK(status); - for (auto &data : result) { - job_table_data.push_back(data.SerializeAsString()); - } - promise.set_value(true); - }; - RAY_CHECK_OK(gcs_client_->Jobs().AsyncGetAll(on_done)); + RAY_CHECK_OK(gcs_client_->Jobs().AsyncGetAll( + TransformForAccessorCallback(job_table_data, promise))); promise.get_future().get(); return job_table_data; } +std::vector GlobalStateAccessor::GetAllNodeInfo() { + std::vector node_table_data; + std::promise promise; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAll( + TransformForAccessorCallback(node_table_data, promise))); + promise.get_future().get(); + return node_table_data; +} + std::vector GlobalStateAccessor::GetAllProfileInfo() { std::vector profile_table_data; std::promise promise; - auto on_done = [&profile_table_data, &promise]( - const Status &status, - const std::vector &result) { - RAY_CHECK_OK(status); - for (auto &data : result) { - profile_table_data.push_back(data.SerializeAsString()); - } - promise.set_value(true); - }; - RAY_CHECK_OK(gcs_client_->Stats().AsyncGetAll(on_done)); + RAY_CHECK_OK(gcs_client_->Stats().AsyncGetAll( + TransformForAccessorCallback(profile_table_data, promise))); promise.get_future().get(); return profile_table_data; } diff --git a/src/ray/gcs/gcs_client/global_state_accessor.h b/src/ray/gcs/gcs_client/global_state_accessor.h index 3d4e9b9df..d33f63d89 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.h +++ b/src/ray/gcs/gcs_client/global_state_accessor.h @@ -15,6 +15,7 @@ #ifndef RAY_GCS_GLOBAL_STATE_ACCESSOR_H #define RAY_GCS_GLOBAL_STATE_ACCESSOR_H +#include "ray/rpc/server_call.h" #include "service_based_gcs_client.h" namespace ray { @@ -51,6 +52,11 @@ class GlobalStateAccessor { /// protobuf function. std::vector GetAllJobInfo(); + /// Get all node information from GCS. + /// + /// \return A list of `GcsNodeInfo` objects serialized in protobuf format. + std::vector GetAllNodeInfo(); + /// Get information of all profiles from GCS Service. /// /// \return All profile info. To support multi-language, we serialized each @@ -73,6 +79,21 @@ class GlobalStateAccessor { /// protobuf function. std::unique_ptr GetObjectInfo(const ObjectID &object_id); + private: + /// MultiItem tranformation helper in template style. + /// + /// \return MultiItemCallback within in rpc type DATA. + template + MultiItemCallback TransformForAccessorCallback(std::vector &data_vec, + std::promise &promise) { + return [&data_vec, &promise](const Status &status, const std::vector &result) { + RAY_CHECK_OK(status); + std::transform(result.begin(), result.end(), std::back_inserter(data_vec), + [](const DATA &data) { return data.SerializeAsString(); }); + promise.set_value(true); + }; + } + private: /// Whether this client is connected to gcs server. bool is_connected_{false}; diff --git a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc index eb91b1cd1..82f03e8a9 100644 --- a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc +++ b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc @@ -110,6 +110,28 @@ TEST_F(GlobalStateAccessorTest, TestJobTable) { ASSERT_EQ(global_state_->GetAllJobInfo().size(), job_count); } +TEST_F(GlobalStateAccessorTest, TestNodeTable) { + int node_count = 100; + ASSERT_EQ(global_state_->GetAllNodeInfo().size(), 0); + // It's useful to check if index value will be marked as address suffix. + for (int index = 0; index < node_count; ++index) { + auto node_table_data = + Mocker::GenNodeInfo(index, std::string("127.0.0.") + std::to_string(index)); + std::promise promise; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncRegister( + *node_table_data, [&promise](Status status) { promise.set_value(status.ok()); })); + WaitReady(promise.get_future(), timeout_ms_); + } + auto node_table = global_state_->GetAllNodeInfo(); + ASSERT_EQ(node_table.size(), node_count); + for (int index = 0; index < node_count; ++index) { + rpc::GcsNodeInfo node_data; + node_data.ParseFromString(node_table[index]); + ASSERT_EQ(node_data.node_manager_address(), + std::string("127.0.0.") + std::to_string(node_data.node_manager_port())); + } +} + TEST_F(GlobalStateAccessorTest, TestProfileTable) { int profile_count = 100; ASSERT_EQ(global_state_->GetAllProfileInfo().size(), 0); diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index e7e0f9eeb..890b54811 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -63,11 +63,12 @@ struct Mocker { return request; } - static std::shared_ptr GenNodeInfo(uint16_t port = 0) { + static std::shared_ptr GenNodeInfo( + uint16_t port = 0, const std::string address = "127.0.0.1") { auto node = std::make_shared(); node->set_node_id(ClientID::FromRandom().Binary()); node->set_node_manager_port(port); - node->set_node_manager_address("127.0.0.1"); + node->set_node_manager_address(address); return node; }