diff --git a/java/cli/src/main/java/org/ray/cli/RayCli.java b/java/cli/src/main/java/org/ray/cli/RayCli.java index cdff1d9f1..2c422b943 100644 --- a/java/cli/src/main/java/org/ray/cli/RayCli.java +++ b/java/cli/src/main/java/org/ray/cli/RayCli.java @@ -16,9 +16,8 @@ import org.ray.spi.PathConfig; import org.ray.spi.RemoteFunctionManager; import org.ray.spi.StateStoreProxy; import org.ray.spi.impl.NativeRemoteFunctionManager; -import org.ray.spi.impl.NonRayletStateStoreProxyImpl; -import org.ray.spi.impl.RayletStateStoreProxyImpl; import org.ray.spi.impl.RedisClient; +import org.ray.spi.impl.StateStoreProxyImpl; import org.ray.util.FileUtil; import org.ray.util.config.ConfigReader; import org.ray.util.logger.RayLog; @@ -148,9 +147,7 @@ public class RayCli { KeyValueStoreLink kvStore = new RedisClient(); kvStore.setAddr(cmdSubmit.redisAddress); - StateStoreProxy stateStoreProxy = params.use_raylet - ? new RayletStateStoreProxyImpl(kvStore) - : new NonRayletStateStoreProxyImpl(kvStore); + StateStoreProxy stateStoreProxy = new StateStoreProxyImpl(kvStore); stateStoreProxy.initializeGlobalState(); RemoteFunctionManager functionManager = new NativeRemoteFunctionManager(kvStore); diff --git a/java/common/src/main/java/org/ray/util/config/ConfigReader.java b/java/common/src/main/java/org/ray/util/config/ConfigReader.java index db477e2bd..05958136b 100644 --- a/java/common/src/main/java/org/ray/util/config/ConfigReader.java +++ b/java/common/src/main/java/org/ray/util/config/ConfigReader.java @@ -324,9 +324,8 @@ public class ConfigReader { String sv = getStringValue(section, fld.getName(), defaultFldValue.toString(), comment); Object v; try { - v = fld.getType().getConstructor(new Class[] {String.class}).newInstance(sv); - } catch (NoSuchMethodException | SecurityException | InstantiationException - | InvocationTargetException e) { + v = UniqueId.fromHexString(sv); + } catch (IllegalArgumentException e) { System.err.println( section + "." + fld.getName() + "'s format (" + sv + ") is invalid, default to " + defaultFldValue.toString()); diff --git a/java/prepare.sh b/java/prepare.sh index bbea0fdca..807301a74 100755 --- a/java/prepare.sh +++ b/java/prepare.sh @@ -44,9 +44,6 @@ fi declare -a nativeBinaries=( "./src/common/thirdparty/redis/src/redis-server" "./src/plasma/plasma_store_server" - "./src/plasma/plasma_manager" - "./src/local_scheduler/local_scheduler" - "./src/global_scheduler/global_scheduler" "./src/ray/raylet/raylet" "./src/ray/raylet/raylet_monitor" ) diff --git a/java/ray.config.ini b/java/ray.config.ini index bb9a1fd1b..8f63695cd 100644 --- a/java/ray.config.ini +++ b/java/ray.config.ini @@ -44,8 +44,6 @@ driver_id = 0123456789abcdef0123456789abcdef01234567 redis_port = 34111 -num_local_schedulers = 1 - max_submit_task_buffer_size_bytes = 51200 default_first_check_timeout_ms = 1000 @@ -60,8 +58,6 @@ deploy = false onebox_delay_seconds_before_run_app_logic = 0 -use_raylet = true - static_resources = CPU:4,GPU:0 ; java class which main is served as the driver in a java worker @@ -122,9 +118,6 @@ driver_args = redis_server = %CONFIG_FILE_DIR%/../build/src/common/thirdparty/redis/src/redis-server redis_module = %CONFIG_FILE_DIR%/../build/src/common/redis_module/libray_redis_module.so store = %CONFIG_FILE_DIR%/../build/src/plasma/plasma_store_server -store_manager = %CONFIG_FILE_DIR%/../build/src/plasma/plasma_manager -local_scheduler = %CONFIG_FILE_DIR%/../build/src/local_scheduler/local_scheduler -global_scheduler = %CONFIG_FILE_DIR%/../build/src/global_scheduler/global_scheduler raylet = %CONFIG_FILE_DIR%/../build/src/ray/raylet/raylet python_dir = %CONFIG_FILE_DIR%/../build/ java_runtime_rewritten_jars_dir = @@ -135,9 +128,6 @@ java_jnilib_paths = ray.java.path.jni.package redis_server = %CONFIG_FILE_DIR%/../build/src/common/thirdparty/redis/src/redis-server redis_module = %CONFIG_FILE_DIR%/../build/src/common/redis_module/libray_redis_module.so store = %CONFIG_FILE_DIR%/../build/src/plasma/plasma_store_server -store_manager = %CONFIG_FILE_DIR%/../build/src/plasma/plasma_manager -local_scheduler = %CONFIG_FILE_DIR%/../build/src/local_scheduler/local_scheduler -global_scheduler = %CONFIG_FILE_DIR%/../build/src/global_scheduler/global_scheduler raylet = %CONFIG_FILE_DIR%/../build/src/ray/raylet/raylet python_dir = %CONFIG_FILE_DIR%/../build/ java_runtime_rewritten_jars_dir = @@ -148,9 +138,6 @@ java_jnilib_paths = ray.java.path.jni.package redis_server = %CONFIG_FILE_DIR%/native/bin/redis-server redis_module = %CONFIG_FILE_DIR%/native/lib/libray_redis_module.so store = %CONFIG_FILE_DIR%/native/bin/plasma_store_server -store_manager = %CONFIG_FILE_DIR%/native/bin/plasma_manager -local_scheduler = %CONFIG_FILE_DIR%/native/bin/local_scheduler -global_scheduler = %CONFIG_FILE_DIR%/native/bin/global_scheduler raylet = %CONFIG_FILE_DIR%/native/bin/raylet python_dir = %CONFIG_FILE_DIR%/python java_runtime_rewritten_jars_dir = %CONFIG_FILE_DIR%/java/lib/ diff --git a/java/runtime-common/src/main/java/org/ray/core/AbstractRayRuntime.java b/java/runtime-common/src/main/java/org/ray/core/AbstractRayRuntime.java index 31ef679cf..3fbcdbc2d 100644 --- a/java/runtime-common/src/main/java/org/ray/core/AbstractRayRuntime.java +++ b/java/runtime-common/src/main/java/org/ray/core/AbstractRayRuntime.java @@ -129,12 +129,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { functions = new LocalFunctionManager(remoteLoader); localSchedulerClient = slink; - if (!params.use_raylet) { - objectStoreProxy = new ObjectStoreProxy(plink); - } else { - objectStoreProxy = new ObjectStoreProxy(plink, slink); - } - + objectStoreProxy = new ObjectStoreProxy(plink); worker = new Worker(this); } @@ -203,9 +198,6 @@ public abstract class AbstractRayRuntime implements RayRuntime { public void put(UniqueId objectId, T obj) { UniqueId taskId = WorkerContext.currentTask().taskId; RayLog.core.info("Putting object {}, for task {} ", objectId, taskId); - if (!params.use_raylet) { - localSchedulerClient.markTaskPutDependency(taskId, objectId); - } objectStoreProxy.put(objectId, obj, null); } @@ -227,11 +219,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { List> fetchBatches = splitIntoBatches(objectIds, params.worker_fetch_request_size); for (List batch : fetchBatches) { - if (!params.use_raylet) { - objectStoreProxy.fetch(batch); - } else { - localSchedulerClient.reconstructObjects(batch, true); - } + localSchedulerClient.reconstructObjects(batch, true); } // Get the objects. We initially try to get the objects immediately. @@ -256,16 +244,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { splitIntoBatches(unreadyList, params.worker_fetch_request_size); for (List batch : reconstructBatches) { - if (!params.use_raylet) { - for (UniqueId objectId : batch) { - localSchedulerClient.reconstructObject(objectId, false); - } - // Do another fetch for objects that aren't available locally yet, in case - // they were evicted since the last fetch. - objectStoreProxy.fetch(batch); - } else { - localSchedulerClient.reconstructObjects(batch, false); - } + localSchedulerClient.reconstructObjects(batch, false); } List> results = objectStoreProxy @@ -328,7 +307,7 @@ public abstract class AbstractRayRuntime implements RayRuntime { @Override public WaitResult wait(List> waitList, int numReturns, int timeoutMs) { - return objectStoreProxy.wait(waitList, numReturns, timeoutMs); + return localSchedulerClient.wait(waitList, numReturns, timeoutMs); } @Override diff --git a/java/runtime-common/src/main/java/org/ray/core/model/RayParameters.java b/java/runtime-common/src/main/java/org/ray/core/model/RayParameters.java index a17daa9d6..dc4c4beb2 100644 --- a/java/runtime-common/src/main/java/org/ray/core/model/RayParameters.java +++ b/java/runtime-common/src/main/java/org/ray/core/model/RayParameters.java @@ -28,21 +28,6 @@ public class RayParameters { @AConfig(comment = "object store rpc listen port") public int object_store_rpc_port = 32567; - @AConfig(comment = "object store manager name (e.g., /tmp/storeMgr1111") - public String object_store_manager_name = ""; - - @AConfig(comment = "object store manager rpc listen port") - public int object_store_manager_rpc_port = 33567; - - @AConfig(comment = "object store manager ray listen port") - public int object_store_manager_ray_listen_port = 33667; - - @AConfig(comment = "local scheduler name (e.g., /tmp/scheduler1111") - public String local_scheduler_name = ""; - - @AConfig(comment = "local scheduler rpc listen port") - public int local_scheduler_rpc_port = 34567; - @AConfig(comment = "driver ID when the worker is served as a driver") public UniqueId driver_id = UniqueId.NIL; @@ -58,30 +43,15 @@ public class RayParameters { @AConfig(comment = "redirect err and stdout to files for newly created processes") public boolean redirect = true; - @AConfig(comment = "whether to start the global scheduler") - public boolean include_global_scheduler = false; - @AConfig(comment = "whether to start redis shard server in addition to the primary server") public boolean start_redis_shards = false; @AConfig(comment = "whether to clean up the processes when there is a process start failure") public boolean cleanup = false; - @AConfig(comment = "whether to start workers from within the local schedulers") - public boolean start_workers_from_local_scheduler = true; - - @AConfig(comment = "number of cpus assigned to each local scheduler") - public int[] num_cpus = {}; - - @AConfig(comment = "number of gpus assigned to each local scheduler") - public int[] num_gpus = {}; - @AConfig(comment = "number of redis shard servers to be started") public int num_redis_shards = 0; - @AConfig(comment = "number of local schedulers to be started") - public int num_local_schedulers = 1; - @AConfig(comment = "whether this is a deployment in cluster") public boolean deploy = false; @@ -112,9 +82,6 @@ public class RayParameters { @AConfig(comment = "delay seconds under onebox before app logic for debugging") public int onebox_delay_seconds_before_run_app_logic = 0; - @AConfig(comment = "whether to use raylet") - public boolean use_raylet = false; - @AConfig(comment = "raylet socket name (e.g., /tmp/raylet1111") public String raylet_socket_name = ""; diff --git a/java/runtime-common/src/main/java/org/ray/spi/LocalSchedulerLink.java b/java/runtime-common/src/main/java/org/ray/spi/LocalSchedulerLink.java index aa741b1a2..b0b84c76e 100644 --- a/java/runtime-common/src/main/java/org/ray/spi/LocalSchedulerLink.java +++ b/java/runtime-common/src/main/java/org/ray/spi/LocalSchedulerLink.java @@ -1,6 +1,9 @@ package org.ray.spi; import java.util.List; + +import org.ray.api.RayObject; +import org.ray.api.WaitResult; import org.ray.api.id.UniqueId; import org.ray.spi.model.TaskSpec; @@ -13,17 +16,13 @@ public interface LocalSchedulerLink { TaskSpec getTask(); - void markTaskPutDependency(UniqueId taskId, UniqueId objectId); - - void reconstructObject(UniqueId objectId, boolean fetchOnly); - void reconstructObjects(List objectIds, boolean fetchOnly); void notifyUnblocked(); UniqueId generateTaskId(UniqueId driverId, UniqueId parentTaskId, int taskIndex); - List wait(byte[][] objectIds, int timeoutMs, int numReturns); + WaitResult wait(List> waitFor, int numReturns, int timeoutMs); void freePlasmaObjects(List objectIds, boolean localOnly); } diff --git a/java/runtime-common/src/main/java/org/ray/spi/ObjectStoreProxy.java b/java/runtime-common/src/main/java/org/ray/spi/ObjectStoreProxy.java index 25b78223f..dcbf0d8c6 100644 --- a/java/runtime-common/src/main/java/org/ray/spi/ObjectStoreProxy.java +++ b/java/runtime-common/src/main/java/org/ray/spi/ObjectStoreProxy.java @@ -18,17 +18,10 @@ import org.ray.util.exception.TaskExecutionException; public class ObjectStoreProxy { private final ObjectStoreLink store; - private final LocalSchedulerLink localSchedulerLink; private final int getTimeoutMs = 1000; public ObjectStoreProxy(ObjectStoreLink store) { this.store = store; - this.localSchedulerLink = null; - } - - public ObjectStoreProxy(ObjectStoreLink store, LocalSchedulerLink localSchedulerLink) { - this.store = store; - this.localSchedulerLink = localSchedulerLink; } public Pair get(UniqueId objectId, boolean isMetadata) @@ -89,39 +82,6 @@ public class ObjectStoreProxy { store.put(id.getBytes(), Serializer.encode(obj), Serializer.encode(metadata)); } - public WaitResult wait(List> waitfor, int numReturns, int timeout) { - List ids = new ArrayList<>(); - for (RayObject obj : waitfor) { - ids.add(obj.getId()); - } - List readys; - if (localSchedulerLink == null) { - readys = store.wait(getIdBytes(ids), timeout, numReturns); - } else { - readys = localSchedulerLink.wait(getIdBytes(ids), timeout, numReturns); - } - - List> readyList = new ArrayList<>(); - List> unreadyList = new ArrayList<>(); - for (RayObject obj : waitfor) { - if (readys.contains(obj.getId().getBytes())) { - readyList.add(obj); - } else { - unreadyList.add(obj); - } - } - - return new WaitResult<>(readyList, unreadyList); - } - - public void fetch(List objectIds) { - if (localSchedulerLink == null) { - store.fetch(getIdBytes(objectIds)); - } else { - localSchedulerLink.reconstructObjects(objectIds, true); - } - } - public enum GetStatus { SUCCESS, FAILED } diff --git a/java/runtime-common/src/main/java/org/ray/spi/PathConfig.java b/java/runtime-common/src/main/java/org/ray/spi/PathConfig.java index e8e0f1228..c7fa9ca8d 100644 --- a/java/runtime-common/src/main/java/org/ray/spi/PathConfig.java +++ b/java/runtime-common/src/main/java/org/ray/spi/PathConfig.java @@ -28,15 +28,6 @@ public class PathConfig { @AConfig(comment = "path to plasma storage") public String store; - @AConfig(comment = "path to plasma manager") - public String store_manager; - - @AConfig(comment = "path to local scheduler") - public String local_scheduler; - - @AConfig(comment = "path to global scheduler") - public String global_scheduler; - @AConfig(comment = "path to raylet") public String raylet; diff --git a/java/runtime-common/src/main/java/org/ray/spi/model/TaskSpec.java b/java/runtime-common/src/main/java/org/ray/spi/model/TaskSpec.java index 23f4e41ec..e8ef0321f 100644 --- a/java/runtime-common/src/main/java/org/ray/spi/model/TaskSpec.java +++ b/java/runtime-common/src/main/java/org/ray/spi/model/TaskSpec.java @@ -68,6 +68,14 @@ public class TaskSpec { this.createActorId = createActorId; this.resources = resources; this.cursorId = cursorId; + + if (!this.resources.containsKey(ResourceUtil.CPU_LITERAL)) { + this.resources.put(ResourceUtil.CPU_LITERAL, 0.0); + } + + if (!this.resources.containsKey(ResourceUtil.GPU_LITERAL)) { + this.resources.put(ResourceUtil.GPU_LITERAL, 0.0); + } } @Override diff --git a/java/runtime-dev/src/main/java/org/ray/spi/impl/MockLocalScheduler.java b/java/runtime-dev/src/main/java/org/ray/spi/impl/MockLocalScheduler.java index 0f60a8a3b..2ce21b1e1 100644 --- a/java/runtime-dev/src/main/java/org/ray/spi/impl/MockLocalScheduler.java +++ b/java/runtime-dev/src/main/java/org/ray/spi/impl/MockLocalScheduler.java @@ -3,6 +3,9 @@ package org.ray.spi.impl; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; + +import org.ray.api.RayObject; +import org.ray.api.WaitResult; import org.ray.api.id.UniqueId; import org.ray.core.LocalFunctionManager; import org.ray.core.Worker; @@ -70,16 +73,6 @@ public class MockLocalScheduler implements LocalSchedulerLink { throw new RuntimeException("invalid execution flow here"); } - @Override - public void markTaskPutDependency(UniqueId taskId, UniqueId objectId) { - - } - - @Override - public void reconstructObject(UniqueId objectId, boolean fetchOnly) { - - } - @Override public void reconstructObjects(List objectIds, boolean fetchOnly) { @@ -96,8 +89,8 @@ public class MockLocalScheduler implements LocalSchedulerLink { } @Override - public List wait(byte[][] objectIds, int timeoutMs, int numReturns) { - return store.wait(objectIds, timeoutMs, numReturns); + public WaitResult wait(List> waitFor, int numReturns, int timeoutMs) { + throw new RuntimeException("Not implemented here."); } @Override diff --git a/java/runtime-native/src/main/java/org/ray/core/impl/RayNativeRuntime.java b/java/runtime-native/src/main/java/org/ray/core/impl/RayNativeRuntime.java index 8085de80f..2c7d7b634 100644 --- a/java/runtime-native/src/main/java/org/ray/core/impl/RayNativeRuntime.java +++ b/java/runtime-native/src/main/java/org/ray/core/impl/RayNativeRuntime.java @@ -18,9 +18,8 @@ import org.ray.spi.RemoteFunctionManager; import org.ray.spi.StateStoreProxy; import org.ray.spi.impl.DefaultLocalSchedulerClient; import org.ray.spi.impl.NativeRemoteFunctionManager; -import org.ray.spi.impl.NonRayletStateStoreProxyImpl; -import org.ray.spi.impl.RayletStateStoreProxyImpl; import org.ray.spi.impl.RedisClient; +import org.ray.spi.impl.StateStoreProxyImpl; import org.ray.spi.model.AddressInfo; import org.ray.util.logger.RayLog; @@ -53,19 +52,14 @@ public final class RayNativeRuntime extends AbstractRayRuntime { throw new Error("Redis address must be configured under Worker mode."); } startOnebox(params, pathConfig); - initStateStore(params.redis_address, params.use_raylet); + initStateStore(params.redis_address); } else { - initStateStore(params.redis_address, params.use_raylet); + initStateStore(params.redis_address); if (!isWorker) { List nodes = stateStoreProxy.getAddressInfo( params.node_ip_address, params.redis_address, 5); params.object_store_name = nodes.get(0).storeName; - if (!params.use_raylet) { - params.object_store_manager_name = nodes.get(0).managerName; - params.local_scheduler_name = nodes.get(0).schedulerName; - } else { - params.raylet_socket_name = nodes.get(0).rayletSocketName; - } + params.raylet_socket_name = nodes.get(0).rayletSocketName; } } @@ -91,54 +85,29 @@ public final class RayNativeRuntime extends AbstractRayRuntime { } if (params.worker_mode != WorkerMode.NONE) { - String overwrites = ""; // initialize the links int releaseDelay = AbstractRayRuntime.configReader .getIntegerValue("ray", "plasma_default_release_delay", 0, "how many release requests should be delayed in plasma client"); - if (!params.use_raylet) { - ObjectStoreLink plink = new PlasmaClient(params.object_store_name, - params.object_store_manager_name, releaseDelay); + ObjectStoreLink plink = new PlasmaClient(params.object_store_name, "", releaseDelay); + LocalSchedulerLink slink = new DefaultLocalSchedulerClient( + params.raylet_socket_name, + WorkerContext.currentWorkerId(), + isWorker, + WorkerContext.currentTask().taskId + ); - LocalSchedulerLink slink = new DefaultLocalSchedulerClient( - params.local_scheduler_name, - WorkerContext.currentWorkerId(), - isWorker, - WorkerContext.currentTask().taskId, - false - ); + init(slink, plink, funcMgr, pathConfig); - init(slink, plink, funcMgr, pathConfig); + // register + registerWorker(isWorker, params.node_ip_address, params.object_store_name, + params.raylet_socket_name); - // register - registerWorker(isWorker, params.node_ip_address, params.object_store_name, - params.object_store_manager_name, params.local_scheduler_name); - } else { - - ObjectStoreLink plink = new PlasmaClient(params.object_store_name, "", releaseDelay); - - LocalSchedulerLink slink = new DefaultLocalSchedulerClient( - params.raylet_socket_name, - WorkerContext.currentWorkerId(), - isWorker, - WorkerContext.currentTask().taskId, - true - ); - - init(slink, plink, funcMgr, pathConfig); - - // register - registerWorker(isWorker, params.node_ip_address, params.object_store_name, - params.raylet_socket_name); - } } - RayLog.core.info("RayNativeRuntime start with " - + "store " + params.object_store_name - + ", manager " + params.object_store_manager_name - + ", scheduler " + params.local_scheduler_name - ); + RayLog.core.info("RayNativeRuntime started with store {}, raylet {}", + params.object_store_name, params.raylet_socket_name); } @Override @@ -155,19 +124,14 @@ public final class RayNativeRuntime extends AbstractRayRuntime { params.redis_address = manager.info().redisAddress; params.object_store_name = manager.info().localStores.get(0).storeName; - params.object_store_manager_name = manager.info().localStores.get(0).managerName; - params.local_scheduler_name = manager.info().localStores.get(0).schedulerName; params.raylet_socket_name = manager.info().localStores.get(0).rayletSocketName; //params.node_ip_address = NetworkUtil.getIpAddress(); } - private void initStateStore(String redisAddress, boolean useRaylet) throws Exception { + private void initStateStore(String redisAddress) throws Exception { kvStore = new RedisClient(); kvStore.setAddr(redisAddress); - stateStoreProxy = useRaylet - ? new RayletStateStoreProxyImpl(kvStore) - : new NonRayletStateStoreProxyImpl(kvStore); - //stateStoreProxy.setStore(kvStore); + stateStoreProxy = new StateStoreProxyImpl(kvStore); stateStoreProxy.initializeGlobalState(); } @@ -193,27 +157,4 @@ public final class RayNativeRuntime extends AbstractRayRuntime { } } - private void registerWorker(boolean isWorker, String nodeIpAddress, String storeName, - String managerName, String schedulerName) { - Map workerInfo = new HashMap<>(); - String workerId = new String(WorkerContext.currentWorkerId().getBytes()); - if (!isWorker) { - workerInfo.put("node_ip_address", nodeIpAddress); - workerInfo.put("driver_id", workerId); - workerInfo.put("start_time", String.valueOf(System.currentTimeMillis())); - workerInfo.put("plasma_store_socket", storeName); - workerInfo.put("plasma_manager_socket", managerName); - workerInfo.put("local_scheduler_socket", schedulerName); - workerInfo.put("name", System.getProperty("user.dir")); - //TODO: worker.redis_client.hmset(b"Drivers:" + worker.workerId, driver_info) - kvStore.hmset("Drivers:" + workerId, workerInfo); - } else { - workerInfo.put("node_ip_address", nodeIpAddress); - workerInfo.put("plasma_store_socket", storeName); - workerInfo.put("plasma_manager_socket", managerName); - workerInfo.put("local_scheduler_socket", schedulerName); - //TODO: b"Workers:" + worker.workerId, - kvStore.hmset("Workers:" + workerId, workerInfo); - } - } } diff --git a/java/runtime-native/src/main/java/org/ray/runner/RunInfo.java b/java/runtime-native/src/main/java/org/ray/runner/RunInfo.java index cddf74f13..ae12d8603 100644 --- a/java/runtime-native/src/main/java/org/ray/runner/RunInfo.java +++ b/java/runtime-native/src/main/java/org/ray/runner/RunInfo.java @@ -34,8 +34,12 @@ public class RunInfo { } public enum ProcessType { - PT_WORKER, PT_LOCAL_SCHEDULER, PT_PLASMA_MANAGER, PT_PLASMA_STORE, - PT_GLOBAL_SCHEDULER, PT_REDIS_SERVER, PT_WEB_UI, PT_RAYLET, + PT_WORKER, + PT_PLASMA_STORE, + PT_REDIS_SERVER, + PT_WEB_UI, + PT_RAYLET, PT_DRIVER } + } diff --git a/java/runtime-native/src/main/java/org/ray/runner/RunManager.java b/java/runtime-native/src/main/java/org/ray/runner/RunManager.java index 95ff76e3a..d99f9b4cd 100644 --- a/java/runtime-native/src/main/java/org/ray/runner/RunManager.java +++ b/java/runtime-native/src/main/java/org/ray/runner/RunManager.java @@ -29,8 +29,6 @@ import redis.clients.jedis.Jedis; */ public class RunManager { - public static final int INT16_MAX = 32767; - private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("Y-m-d_H-M-S"); @@ -71,13 +69,7 @@ public class RunManager { if (params.num_redis_shards <= 0) { params.num_redis_shards = 1; } - if (params.num_local_schedulers <= 0) { - params.num_local_schedulers = 1; - } - params.start_workers_from_local_scheduler = params.run_mode != RunMode.SINGLE_BOX; - - params.include_global_scheduler = true; params.start_redis_shards = true; startRayProcesses(); @@ -90,12 +82,7 @@ public class RunManager { if (params.num_redis_shards != 0) { throw new Exception("Number of redis shards should be zero in non-head node."); } - if (params.num_local_schedulers <= 0) { - params.num_local_schedulers = 1; - } - //params.start_workers_from_local_scheduler = true; - params.include_global_scheduler = false; params.start_redis_shards = false; startRayProcesses(); @@ -281,120 +268,34 @@ public class RunManager { } redisClient.close(); - // start global scheduler - if (params.include_global_scheduler && !params.use_raylet) { - startGlobalScheduler( - params.redis_address, params.node_ip_address, params.redirect, params.cleanup); - } - - // prepare parameters for node processes - if (params.num_cpus.length == 0) { - params.num_cpus = new int[params.num_local_schedulers]; - for (int i = 0; i < params.num_local_schedulers; i++) { - params.num_cpus[i] = 1; - } - } else { - assert (params.num_cpus.length == params.num_local_schedulers); - } - - if (params.num_gpus.length == 0) { - params.num_gpus = new int[params.num_local_schedulers]; - for (int i = 0; i < params.num_local_schedulers; i++) { - params.num_gpus[i] = 0; - } - } else { - assert (params.num_gpus.length == params.num_local_schedulers); - } - - int[] localNumWorkers = new int[params.num_local_schedulers]; - if (params.num_workers == 0) { - System.arraycopy(params.num_cpus, 0, localNumWorkers, 0, params.num_local_schedulers); - } else { - for (int i = 0; i < params.num_local_schedulers; i++) { - localNumWorkers[i] = params.num_workers; - } - } - AddressInfo info = new AddressInfo(); - if (params.use_raylet) { - // Start object store - int rpcPort = params.object_store_rpc_port; - String storeName = "/tmp/plasma_store" + rpcPort; + // Start object store + int rpcPort = params.object_store_rpc_port; + String storeName = "/tmp/plasma_store" + rpcPort; - startObjectStore(0, info, - params.redis_address, params.node_ip_address, params.redirect, params.cleanup); - - Map staticResources = - ResourceUtil.getResourcesMapFromString(params.static_resources); - - //Start raylet - startRaylet(storeName, info, params.num_workers, - params.redis_address, - params.node_ip_address, params.redirect, staticResources, params.cleanup); - - runInfo.localStores.add(info); - } else { - for (int i = 0; i < params.num_local_schedulers; i++) { - // Start object stores - startObjectStore(i, info, + startObjectStore(0, info, params.redis_address, params.node_ip_address, params.redirect, params.cleanup); - startObjectManager(i, info, + Map staticResources = + ResourceUtil.getResourcesMapFromString(params.static_resources); + + //Start raylet + startRaylet(storeName, info, params.num_workers, params.redis_address, - params.node_ip_address, params.redirect, params.cleanup); + params.node_ip_address, params.redirect, staticResources, params.cleanup); - // Start local scheduler - int workerCount = 0; + runInfo.localStores.add(info); - if (params.start_workers_from_local_scheduler) { - workerCount = localNumWorkers[i]; - localNumWorkers[i] = 0; - } - - startLocalScheduler(i, info, - params.num_cpus[i], params.num_gpus[i], workerCount, - params.redis_address, - params.node_ip_address, params.redirect, params.cleanup); - - runInfo.localStores.add(info); - } - } - - // start local workers - if (!params.use_raylet) { - for (int i = 0; i < params.num_local_schedulers; i++) { - AddressInfo localStores = runInfo.localStores.get(i); - localStores.workerCount = localNumWorkers[i]; - for (int j = 0; j < localNumWorkers[i]; j++) { - startWorker(localStores.storeName, localStores.managerName, localStores.schedulerName, - "/worker" + i + "." + j, params.redis_address, - params.node_ip_address, UniqueId.NIL, "", params.redirect, params.cleanup); - } - } - } - - HashSet excludeTypes = new HashSet<>(); - if (!params.use_raylet) { - excludeTypes.add(RunInfo.ProcessType.PT_RAYLET); - } else { - excludeTypes.add(RunInfo.ProcessType.PT_LOCAL_SCHEDULER); - excludeTypes.add(RunInfo.ProcessType.PT_GLOBAL_SCHEDULER); - excludeTypes.add(RunInfo.ProcessType.PT_PLASMA_MANAGER); - } - if (!checkAlive(excludeTypes)) { + if (!checkAlive()) { cleanup(true); throw new RuntimeException("Start Ray processes failed"); } } - private boolean checkAlive(HashSet excludeTypes) { + private boolean checkAlive() { RunInfo.ProcessType[] types = RunInfo.ProcessType.values(); for (int i = 0; i < types.length; i++) { - if (excludeTypes.contains(types[i])) { - continue; - } - ProcessInfo p; for (int j = 0; j < runInfo.allProcesses.get(i).size(); ) { p = runInfo.allProcesses.get(i).get(j); @@ -513,96 +414,6 @@ public class RunManager { return ip + ":" + port; } - private void startGlobalScheduler(String redisAddress, String ip, - boolean redirect, boolean cleanup) { - String filePath = paths.global_scheduler; - String cmd = filePath + " -r " + redisAddress + " -h " + ip; - - Map env = null; - startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_GLOBAL_SCHEDULER, "global_scheduler", - redisAddress, - ip, redirect, cleanup); - } - - /* - * @param storeName The name of the plasma store socket to connect to - * - * @param storeManagerName The name of the plasma manager socket to connect - * to - * - * @param storeManagerAddress the address of the plasma manager to connect - * to - * - * @param workerPath The path of the script to use when the local scheduler - * starts up new workers - * - * @param numCpus The number of CPUs the local scheduler should be - * configured with - * - * @param numGpus The number of GPUs the local scheduler should be - * configured with - * - * @param numWorkers The number of workers that the local scheduler should - * start - */ - private void startLocalScheduler(int index, AddressInfo info, int numCpus, - int numGpus, int numWorkers, - String redisAddress, String ip, boolean redirect, - boolean cleanup) { - //if (numCpus <= 0) - // numCpus = Runtime.getRuntime().availableProcessors(); - if (numGpus <= 0) { - numGpus = 0; - } - - String filePath = paths.local_scheduler; - int rpcPort = params.local_scheduler_rpc_port + index; - String name = "/tmp/scheduler" + rpcPort; - String rpcAddr = ""; - String cmd = filePath + " -s " + name + " -p " + info.storeName + " -h " + ip + " -n " - + numWorkers + " -c " + "CPU," + INT16_MAX + ",GPU,0"; - - assert (info.managerName.length() > 0); - assert (info.storeName.length() > 0); - assert (redisAddress.length() > 0); - - cmd += " -m " + info.managerName; - - String workerCmd = null; - workerCmd = buildWorkerCommand(true, info.storeName, info.managerName, name, - UniqueId.NIL, "", ip, redisAddress); - cmd += " -w \"" + workerCmd + "\""; - - if (redisAddress.length() > 0) { - cmd += " -r " + redisAddress; - } - if (info.managerPort > 0) { - cmd += " -a " + params.node_ip_address + ":" + info.managerPort; - } - - Map env = null; - String[] cmds = StringUtil.split(cmd, " ", "\"", "\"").toArray(new String[0]); - Process p = startProcess(cmds, env, RunInfo.ProcessType.PT_LOCAL_SCHEDULER, - "local_scheduler", redisAddress, ip, redirect, cleanup); - - if (p != null && p.isAlive()) { - try { - TimeUnit.MILLISECONDS.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - if (p == null || !p.isAlive()) { - info.schedulerName = ""; - info.schedulerRpcAddr = ""; - throw new RuntimeException("Start local scheduler failed ..."); - } else { - info.schedulerName = name; - info.schedulerRpcAddr = rpcAddr; - } - } - private void startRaylet(String storeName, AddressInfo info, int numWorkers, String redisAddress, String ip, boolean redirect, Map staticResources, boolean cleanup) { @@ -658,39 +469,7 @@ public class RunManager { String ip, String redisAddress) { String workerConfigs = "ray.java.start.object_store_name=" + storeName + ";ray.java.start.raylet_socket_name=" + rayletSocketName - + ";ray.java.start.worker_mode=WORKER;ray.java.start.use_raylet=true"; - workerConfigs += ";ray.java.start.deploy=" + params.deploy; - if (!actorId.equals(UniqueId.NIL)) { - workerConfigs += ";ray.java.start.actor_id=" + actorId; - } - if (!actorClass.equals("")) { - workerConfigs += ";ray.java.start.driver_class=" + actorClass; - } - - String jvmArgs = ""; - jvmArgs += " -Dlogging.path=" + params.log_dir; - jvmArgs += " -Dlogging.file.name=core-*pid_suffix*"; - - return buildJavaProcessCommand( - RunInfo.ProcessType.PT_WORKER, - "org.ray.runner.worker.DefaultWorker", - "", - workerConfigs, - jvmArgs, - ip, - redisAddress, - null - ); - } - - private String buildWorkerCommand(boolean isFromLocalScheduler, String storeName, - String storeManagerName, String localSchedulerName, - UniqueId actorId, String actorClass, String - ip, String redisAddress) { - String workerConfigs = "ray.java.start.object_store_name=" + storeName - + ";ray.java.start.object_store_manager_name=" + storeManagerName - + ";ray.java.start.worker_mode=WORKER" - + ";ray.java.start.local_scheduler_name=" + localSchedulerName; + + ";ray.java.start.worker_mode=WORKER"; workerConfigs += ";ray.java.start.deploy=" + params.deploy; if (!actorId.equals(UniqueId.NIL)) { workerConfigs += ";ray.java.start.actor_id=" + actorId; @@ -747,47 +526,4 @@ public class RunManager { } } - private AddressInfo startObjectManager(int index, AddressInfo info, - String redisAddress, String ip, boolean redirect, - boolean cleanup) { - String filePath = paths.store_manager; - int rpcPort = params.object_store_manager_rpc_port + index; - String name = "/tmp/plasma_manager" + rpcPort; - String rpcAddr = ""; - - String cmd = filePath + " -s " + info.storeName + " -m " + name + " -h " + ip + " -p " - + (params.object_store_manager_ray_listen_port + index) - + " -r " + redisAddress; - - Map env = null; - Process p = startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_PLASMA_MANAGER, - "object_manager", redisAddress, ip, redirect, cleanup); - - if (p != null && p.isAlive()) { - try { - TimeUnit.MILLISECONDS.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - if (p == null || !p.isAlive()) { - throw new RuntimeException("Start object manager failed ..."); - } else { - info.managerName = name; - info.managerPort = params.object_store_manager_ray_listen_port + index; - info.managerRpcAddr = rpcAddr; - return info; - } - } - - public void startWorker(String storeName, String storeManagerName, - String localSchedulerName, String workerName, String redisAddress, - String ip, UniqueId actorId, String actorClass, - boolean redirect, boolean cleanup) { - String cmd = buildWorkerCommand(false, storeName, storeManagerName, localSchedulerName, actorId, - actorClass, ip, redisAddress); - startProcess(cmd.split(" "), null, RunInfo.ProcessType.PT_WORKER, workerName, redisAddress, ip, - redirect, cleanup); - } } diff --git a/java/runtime-native/src/main/java/org/ray/spi/impl/DefaultLocalSchedulerClient.java b/java/runtime-native/src/main/java/org/ray/spi/impl/DefaultLocalSchedulerClient.java index 8ee86babc..38d61e92b 100644 --- a/java/runtime-native/src/main/java/org/ray/spi/impl/DefaultLocalSchedulerClient.java +++ b/java/runtime-native/src/main/java/org/ray/spi/impl/DefaultLocalSchedulerClient.java @@ -7,6 +7,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.ray.api.RayObject; +import org.ray.api.WaitResult; import org.ray.api.id.UniqueId; import org.ray.core.AbstractRayRuntime; import org.ray.core.UniqueIdHelper; @@ -28,48 +30,38 @@ public class DefaultLocalSchedulerClient implements LocalSchedulerLink { return bb; }); private long client = 0; - boolean useRaylet = false; public DefaultLocalSchedulerClient(String schedulerSockName, UniqueId clientId, - boolean isWorker, UniqueId driverId, boolean useRaylet) { + boolean isWorker, UniqueId driverId) { client = nativeInit(schedulerSockName, clientId.getBytes(), - isWorker, driverId.getBytes(), useRaylet); - this.useRaylet = useRaylet; + isWorker, driverId.getBytes()); } @Override - public List wait(byte[][] objectIds, int timeoutMs, int numReturns) { - assert (useRaylet == true); + public WaitResult wait(List> waitFor, int numReturns, int timeoutMs) { + List ids = new ArrayList<>(); + for (RayObject element : waitFor) { + ids.add(element.getId()); + } - boolean[] readys = nativeWaitObject(client, objectIds, numReturns, timeoutMs, false); - assert (readys.length == objectIds.length); + boolean[] ready = nativeWaitObject(client, getIdBytes(ids), numReturns, timeoutMs, false); + List> readyList = new ArrayList<>(); + List> unreadyList = new ArrayList<>(); - List ret = new ArrayList<>(); - for (int i = 0; i < readys.length; i++) { - if (readys[i]) { - ret.add(objectIds[i]); + for (int i = 0; i < ready.length; i++) { + if (ready[i]) { + readyList.add(waitFor.get(i)); + } else { + unreadyList.add(waitFor.get(i)); } } - return ret; + return new WaitResult<>(readyList, unreadyList); } @Override public void submitTask(TaskSpec task) { RayLog.core.debug("Submitting task: {}", task); - // We don't support resources management in non raylet mode. - if (!useRaylet) { - task.resources.clear(); - task.resources.put(ResourceUtil.CPU_LITERAL, 0.0); - } else { - if (!task.resources.containsKey(ResourceUtil.CPU_LITERAL)) { - task.resources.put(ResourceUtil.CPU_LITERAL, 0.0); - } - - if (!task.resources.containsKey(ResourceUtil.GPU_LITERAL)) { - task.resources.put(ResourceUtil.GPU_LITERAL, 0.0); - } - } ByteBuffer info = taskSpec2Info(task); byte[] cursorId = null; @@ -77,29 +69,17 @@ public class DefaultLocalSchedulerClient implements LocalSchedulerLink { cursorId = task.cursorId.getBytes(); } - nativeSubmitTask(client, cursorId, info, info.position(), info.remaining(), useRaylet); + nativeSubmitTask(client, cursorId, info, info.position(), info.remaining()); } @Override public TaskSpec getTask() { - byte[] bytes = nativeGetTask(client, useRaylet); + byte[] bytes = nativeGetTask(client); assert (null != bytes); ByteBuffer bb = ByteBuffer.wrap(bytes); return taskInfo2Spec(bb); } - @Override - public void markTaskPutDependency(UniqueId taskId, UniqueId objectId) { - nativePutObject(client, taskId.getBytes(), objectId.getBytes()); - } - - @Override - public void reconstructObject(UniqueId objectId, boolean fetchOnly) { - List objects = new ArrayList<>(); - objects.add(objectId); - nativeReconstructObjects(client, getIdBytes(objects), fetchOnly); - } - @Override public void reconstructObjects(List objectIds, boolean fetchOnly) { if (RayLog.core.isInfoEnabled()) { @@ -289,13 +269,13 @@ public class DefaultLocalSchedulerClient implements LocalSchedulerLink { /// 6) popd private static native long nativeInit(String localSchedulerSocket, byte[] workerId, - boolean isWorker, byte[] driverTaskId, boolean useRaylet); + boolean isWorker, byte[] driverTaskId); private static native void nativeSubmitTask(long client, byte[] cursorId, ByteBuffer taskBuff, - int pos, int taskSize, boolean useRaylet); + int pos, int taskSize); // return TaskInfo (in FlatBuffer) - private static native byte[] nativeGetTask(long client, boolean useRaylet); + private static native byte[] nativeGetTask(long client); private static native void nativeDestroy(long client); diff --git a/java/runtime-native/src/main/java/org/ray/spi/impl/NonRayletStateStoreProxyImpl.java b/java/runtime-native/src/main/java/org/ray/spi/impl/NonRayletStateStoreProxyImpl.java deleted file mode 100644 index f00267df1..000000000 --- a/java/runtime-native/src/main/java/org/ray/spi/impl/NonRayletStateStoreProxyImpl.java +++ /dev/null @@ -1,115 +0,0 @@ -package org.ray.spi.impl; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.ray.spi.KeyValueStoreLink; -import org.ray.spi.model.AddressInfo; - -/** - * A class used to interface with the Ray control state for non-raylet. - */ -public class NonRayletStateStoreProxyImpl extends BaseStateStoreProxyImpl { - public NonRayletStateStoreProxyImpl(KeyValueStoreLink rayKvStore) { - super(rayKvStore); - } - - /* - * get address info of one node from primary redis - * @param: node ip address, usually local ip address - * @return: a list of SchedulerInfo which contains storeName, managerName, managerPort and - * schedulerName - * @note:Redis data key is "CL:*", redis data value is a hash. - * The hash contains the following: - * "deleted" : 0/1 - * "ray_client_id" - * "node_ip_address" - * "client_type" : plasma_manager/local_scheduler - * "store_socket_name"(op) - * "manager_socket_name"(op) - * "local_scheduler_socket_name"(op) - */ - @Override - public List doGetAddressInfo(final String nodeIpAddress, - final String redisAddress) throws Exception { - if (this.rayKvStore == null) { - throw new Exception("no redis client when use doGetAddressInfo"); - } - List schedulerInfo = new ArrayList<>(); - - Set cks = rayKvStore.keys("CL:*".getBytes()); - byte[] key; - List> plasmaManager = new ArrayList<>(); - List> localScheduler = new ArrayList<>(); - for (byte[] ck : cks) { - key = ck; - Map info = rayKvStore.hgetAll(key); - - String deleted = charsetDecode(info.get("deleted".getBytes()), "US-ASCII"); - if (deleted != null) { - if (Boolean.getBoolean(deleted)) { - continue; - } - } - - if (!info.containsKey("ray_client_id".getBytes())) { - throw new Exception("no ray_client_id in any client"); - } else if (!info.containsKey("node_ip_address".getBytes())) { - throw new Exception("no node_ip_address in any client"); - } else if (!info.containsKey("client_type".getBytes())) { - throw new Exception("no client_type in any client"); - } - - if (charsetDecode(info.get("node_ip_address".getBytes()), "US-ASCII") - .equals(nodeIpAddress)) { - String clientType = charsetDecode(info.get("client_type".getBytes()), "US-ASCII"); - if ("plasma_manager".equals(clientType)) { - plasmaManager.add(info); - } else if ("local_scheduler".equals(clientType)) { - localScheduler.add(info); - } - } - } - - if (plasmaManager.size() < 1 || localScheduler.size() < 1) { - throw new Exception("no plasma_manager or local_scheduler"); - } else if (plasmaManager.size() != localScheduler.size()) { - throw new Exception("plasma_manager number not Equal local_scheduler number"); - } - - for (int i = 0; i < plasmaManager.size(); i++) { - AddressInfo si = new AddressInfo(); - si.storeName = charsetDecode(plasmaManager.get(i).get("store_socket_name".getBytes()), - "US-ASCII"); - si.managerName = charsetDecode(plasmaManager.get(i).get("manager_socket_name".getBytes()), - "US-ASCII"); - - byte[] rpc = plasmaManager.get(i).get("manager_rpc_name".getBytes()); - if (rpc != null) { - si.managerRpcAddr = charsetDecode(rpc, "US-ASCII"); - } - - rpc = plasmaManager.get(i).get("store_rpc_name".getBytes()); - if (rpc != null) { - si.storeRpcAddr = charsetDecode(rpc, "US-ASCII"); - } - - String managerAddr = charsetDecode(plasmaManager.get(i).get("manager_address".getBytes()), - "US-ASCII"); - si.managerPort = Integer.parseInt(managerAddr.split(":")[1]); - si.schedulerName = charsetDecode( - localScheduler.get(i).get("local_scheduler_socket_name".getBytes()), "US-ASCII"); - - rpc = localScheduler.get(i).get("local_scheduler_rpc_name".getBytes()); - if (rpc != null) { - si.schedulerRpcAddr = charsetDecode(rpc, "US-ASCII"); - } - - schedulerInfo.add(si); - } - - return schedulerInfo; - } - -} diff --git a/java/runtime-native/src/main/java/org/ray/spi/impl/RayletStateStoreProxyImpl.java b/java/runtime-native/src/main/java/org/ray/spi/impl/RayletStateStoreProxyImpl.java deleted file mode 100644 index c2b5f5574..000000000 --- a/java/runtime-native/src/main/java/org/ray/spi/impl/RayletStateStoreProxyImpl.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.ray.spi.impl; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import org.ray.api.id.UniqueId; -import org.ray.format.gcs.ClientTableData; -import org.ray.spi.KeyValueStoreLink; -import org.ray.spi.model.AddressInfo; -import org.ray.util.NetworkUtil; - -/** - * A class used to interface with the Ray control state for raylet. - */ -public class RayletStateStoreProxyImpl extends BaseStateStoreProxyImpl { - - public RayletStateStoreProxyImpl(KeyValueStoreLink rayKvStore) { - super(rayKvStore); - } - - @Override - public List doGetAddressInfo(final String nodeIpAddress, - final String redisAddress) throws Exception { - if (this.rayKvStore == null) { - throw new Exception("no redis client when use doGetAddressInfo"); - } - List schedulerInfo = new ArrayList<>(); - - byte[] prefix = "CLIENT".getBytes(); - byte[] postfix = UniqueId.genNil().getBytes(); - byte[] clientKey = new byte[prefix.length + postfix.length]; - System.arraycopy(prefix, 0, clientKey, 0, prefix.length); - System.arraycopy(postfix, 0, clientKey, prefix.length, postfix.length); - - Set clients = rayKvStore.zrange(clientKey, 0, -1); - - for (byte[] clientMessage : clients) { - ByteBuffer bb = ByteBuffer.wrap(clientMessage); - ClientTableData client = ClientTableData.getRootAsClientTableData(bb); - String clientNodeIpAddress = client.nodeManagerAddress(); - - String localIpAddress = NetworkUtil.getIpAddress(null); - String redisIpAddress = redisAddress.substring(0, redisAddress.indexOf(':')); - - boolean headNodeAddress = "127.0.0.1".equals(clientNodeIpAddress) - && Objects.equals(redisIpAddress, localIpAddress); - boolean notHeadNodeAddress = Objects.equals(clientNodeIpAddress, nodeIpAddress); - - if (headNodeAddress || notHeadNodeAddress) { - AddressInfo si = new AddressInfo(); - si.storeName = client.objectStoreSocketName(); - si.rayletSocketName = client.rayletSocketName(); - si.managerRpcAddr = client.nodeManagerAddress(); - si.managerPort = client.nodeManagerPort(); - schedulerInfo.add(si); - } - } - return schedulerInfo; - } -} diff --git a/java/runtime-native/src/main/java/org/ray/spi/impl/BaseStateStoreProxyImpl.java b/java/runtime-native/src/main/java/org/ray/spi/impl/StateStoreProxyImpl.java similarity index 58% rename from java/runtime-native/src/main/java/org/ray/spi/impl/BaseStateStoreProxyImpl.java rename to java/runtime-native/src/main/java/org/ray/spi/impl/StateStoreProxyImpl.java index 9d01134f5..6fb264211 100644 --- a/java/runtime-native/src/main/java/org/ray/spi/impl/BaseStateStoreProxyImpl.java +++ b/java/runtime-native/src/main/java/org/ray/spi/impl/StateStoreProxyImpl.java @@ -1,26 +1,30 @@ package org.ray.spi.impl; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.ray.api.id.UniqueId; +import org.ray.format.gcs.ClientTableData; import org.ray.spi.KeyValueStoreLink; import org.ray.spi.StateStoreProxy; import org.ray.spi.model.AddressInfo; +import org.ray.util.NetworkUtil; import org.ray.util.logger.RayLog; /** - * Base class used to interface with the Ray control state. + * A class used to interface with the Ray control state. */ -public abstract class BaseStateStoreProxyImpl implements StateStoreProxy { +public class StateStoreProxyImpl implements StateStoreProxy { public KeyValueStoreLink rayKvStore; public ArrayList shardStoreList = new ArrayList<>(); - public BaseStateStoreProxyImpl(KeyValueStoreLink rayKvStore) { + public StateStoreProxyImpl(KeyValueStoreLink rayKvStore) { this.rayKvStore = rayKvStore; } @@ -49,7 +53,7 @@ public abstract class BaseStateStoreProxyImpl implements StateStoreProxy { Set distinctIpAddress = new HashSet(ipAddressPorts); if (distinctIpAddress.size() != numRedisShards) { es = String.format("Expected %d Redis shard addresses, found2 %d.", numRedisShards, - distinctIpAddress.size()); + distinctIpAddress.size()); throw new Exception(es); } @@ -78,7 +82,7 @@ public abstract class BaseStateStoreProxyImpl implements StateStoreProxy { @Override public List getAddressInfo(final String nodeIpAddress, - final String redisAddress, + final String redisAddress, int numRetries) { int count = 0; while (count < numRetries) { @@ -86,11 +90,11 @@ public abstract class BaseStateStoreProxyImpl implements StateStoreProxy { return doGetAddressInfo(nodeIpAddress, redisAddress); } catch (Exception e) { try { - RayLog.core.warn("Error occurred in BaseStateStoreProxyImpl getAddressInfo, " - + (numRetries - count) + " retries remaining", e); + RayLog.core.warn("Error occurred in StateStoreProxyImpl getAddressInfo, " + + (numRetries - count) + " retries remaining", e); TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException ie) { - RayLog.core.error("error at BaseStateStoreProxyImpl getAddressInfo", e); + RayLog.core.error("error at StateStoreProxyImpl getAddressInfo", e); throw new RuntimeException(e); } } @@ -108,8 +112,44 @@ public abstract class BaseStateStoreProxyImpl implements StateStoreProxy { * @return A list of SchedulerInfo which contains node manager or local scheduler address info. * @throws Exception No redis client exception. */ - protected abstract List doGetAddressInfo(final String nodeIpAddress, - final String redisAddress) throws Exception; + public List doGetAddressInfo(final String nodeIpAddress, + final String redisAddress) throws Exception { + if (this.rayKvStore == null) { + throw new Exception("no redis client when use doGetAddressInfo"); + } + List schedulerInfo = new ArrayList<>(); + + byte[] prefix = "CLIENT".getBytes(); + byte[] postfix = UniqueId.genNil().getBytes(); + byte[] clientKey = new byte[prefix.length + postfix.length]; + System.arraycopy(prefix, 0, clientKey, 0, prefix.length); + System.arraycopy(postfix, 0, clientKey, prefix.length, postfix.length); + + Set clients = rayKvStore.zrange(clientKey, 0, -1); + + for (byte[] clientMessage : clients) { + ByteBuffer bb = ByteBuffer.wrap(clientMessage); + ClientTableData client = ClientTableData.getRootAsClientTableData(bb); + String clientNodeIpAddress = client.nodeManagerAddress(); + + String localIpAddress = NetworkUtil.getIpAddress(null); + String redisIpAddress = redisAddress.substring(0, redisAddress.indexOf(':')); + + boolean headNodeAddress = "127.0.0.1".equals(clientNodeIpAddress) + && Objects.equals(redisIpAddress, localIpAddress); + boolean notHeadNodeAddress = Objects.equals(clientNodeIpAddress, nodeIpAddress); + + if (headNodeAddress || notHeadNodeAddress) { + AddressInfo si = new AddressInfo(); + si.storeName = client.objectStoreSocketName(); + si.rayletSocketName = client.rayletSocketName(); + si.managerRpcAddr = client.nodeManagerAddress(); + si.managerPort = client.nodeManagerPort(); + schedulerInfo.add(si); + } + } + return schedulerInfo; + } protected String charsetDecode(byte[] bs, String charset) throws UnsupportedEncodingException { return new String(bs, charset); diff --git a/java/test.sh b/java/test.sh index 3e81151fa..542b85ac5 100755 --- a/java/test.sh +++ b/java/test.sh @@ -18,14 +18,6 @@ check_style=$(mvn checkstyle:check) echo "${check_style}" [[ ${check_style} =~ "BUILD FAILURE" ]] && exit 1 -# test non-raylet -sed -i 's/^use_raylet.*$/use_raylet = false/g' $ROOT_DIR/../java/ray.config.ini -mvn_test=$(mvn test) -echo "${mvn_test}" -[[ ${mvn_test} =~ "BUILD SUCCESS" ]] || exit 1 - -# test raylet -sed -i 's/^use_raylet.*$/use_raylet = true/g' $ROOT_DIR/../java/ray.config.ini mvn_test=$(mvn test) echo "${mvn_test}" [[ ${mvn_test} =~ "BUILD SUCCESS" ]] || exit 1 diff --git a/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java b/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java index 5e1453904..4da3e1c95 100644 --- a/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java +++ b/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java @@ -25,7 +25,6 @@ public class PlasmaFreeTest { @Test public void test() { - Assume.assumeTrue(AbstractRayRuntime.getParams().use_raylet); RayObject helloId = Ray.call(PlasmaFreeTest::hello); String helloString = helloId.get(); Assert.assertEquals("hello", helloString); diff --git a/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java b/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java index d3b12aecb..9188e685a 100644 --- a/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java +++ b/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java @@ -49,7 +49,6 @@ public class ResourcesManagementTest { @Test public void testMethods() { - Assume.assumeTrue(AbstractRayRuntime.getParams().use_raylet); // This is a case that can satisfy required resources. RayObject result1 = Ray.call(ResourcesManagementTest::echo1, 100); Assert.assertEquals(100, (int) result1.get()); @@ -64,7 +63,6 @@ public class ResourcesManagementTest { @Test public void testActors() { - Assume.assumeTrue(AbstractRayRuntime.getParams().use_raylet); // This is a case that can satisfy required resources. RayActor echo1 = Ray.createActor(Echo1::new); final RayObject result1 = Ray.call(Echo1::echo, echo1, 100); diff --git a/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc b/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc index 6e54cd6d0..22bba808a 100644 --- a/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc +++ b/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc @@ -34,7 +34,7 @@ class UniqueIdFromJByteArray { /* * Class: org_ray_spi_impl_DefaultLocalSchedulerClient * Method: nativeInit - * Signature: (Ljava/lang/String;[BZ[BZ)J + * Signature: (Ljava/lang/String;[BZ[B)J */ JNIEXPORT jlong JNICALL Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeInit( @@ -43,14 +43,13 @@ Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeInit( jstring sockName, jbyteArray workerId, jboolean isWorker, - jbyteArray driverId, - jboolean useRaylet) { + jbyteArray driverId) { UniqueIdFromJByteArray worker_id(env, workerId); UniqueIdFromJByteArray driver_id(env, driverId); const char *nativeString = env->GetStringUTFChars(sockName, JNI_FALSE); auto client = LocalSchedulerConnection_init(nativeString, *worker_id.PID, isWorker, - *driver_id.PID, useRaylet, Language::JAVA); + *driver_id.PID, true, Language::JAVA); env->ReleaseStringUTFChars(sockName, nativeString); return reinterpret_cast(client); } @@ -58,7 +57,7 @@ Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeInit( /* * Class: org_ray_spi_impl_DefaultLocalSchedulerClient * Method: nativeSubmitTask - * Signature: (J[BLjava/nio/ByteBuffer;IIZ)V + * Signature: (J[BLjava/nio/ByteBuffer;II)V */ JNIEXPORT void JNICALL Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeSubmitTask( @@ -68,8 +67,7 @@ Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeSubmitTask( jbyteArray cursorId, jobject taskBuff, jint pos, - jint taskSize, - jboolean useRaylet) { + jint taskSize) { auto conn = reinterpret_cast(client); std::vector execution_dependencies; @@ -77,38 +75,27 @@ Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeSubmitTask( UniqueIdFromJByteArray cursor_id(env, cursorId); execution_dependencies.push_back(*cursor_id.PID); } - if (!useRaylet) { - TaskSpec *task = - reinterpret_cast(env->GetDirectBufferAddress(taskBuff)) + pos; - TaskExecutionSpec taskExecutionSpec = - TaskExecutionSpec(execution_dependencies, task, taskSize); - local_scheduler_submit(conn, taskExecutionSpec); - } else { - auto data = - reinterpret_cast(env->GetDirectBufferAddress(taskBuff)) + pos; - ray::raylet::TaskSpecification task_spec(std::string(data, taskSize)); - local_scheduler_submit_raylet(conn, execution_dependencies, task_spec); - } + + auto data = + reinterpret_cast(env->GetDirectBufferAddress(taskBuff)) + pos; + ray::raylet::TaskSpecification task_spec(std::string(data, taskSize)); + local_scheduler_submit_raylet(conn, execution_dependencies, task_spec); } /* * Class: org_ray_spi_impl_DefaultLocalSchedulerClient * Method: nativeGetTask - * Signature: (JZ)[B + * Signature: (J)[B */ JNIEXPORT jbyteArray JNICALL -Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeGetTask( - JNIEnv *env, - jclass, - jlong client, - jboolean useRaylet) { +Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeGetTask(JNIEnv *env, + jclass, + jlong client) { auto conn = reinterpret_cast(client); int64_t task_size = 0; // TODO: handle actor failure later - TaskSpec *spec = !useRaylet - ? local_scheduler_get_task(conn, &task_size) - : local_scheduler_get_task_raylet(conn, &task_size); + TaskSpec *spec = local_scheduler_get_task_raylet(conn, &task_size); jbyteArray result; result = env->NewByteArray(task_size); diff --git a/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.h b/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.h index cafeb643f..2b1dc4633 100644 --- a/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.h +++ b/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.h @@ -10,7 +10,7 @@ extern "C" { /* * Class: org_ray_spi_impl_DefaultLocalSchedulerClient * Method: nativeInit - * Signature: (Ljava/lang/String;[BZ[BZ)J + * Signature: (Ljava/lang/String;[BZ[B)J */ JNIEXPORT jlong JNICALL Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeInit(JNIEnv *, @@ -18,13 +18,12 @@ Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeInit(JNIEnv *, jstring, jbyteArray, jboolean, - jbyteArray, - jboolean); + jbyteArray); /* * Class: org_ray_spi_impl_DefaultLocalSchedulerClient * Method: nativeSubmitTask - * Signature: (J[BLjava/nio/ByteBuffer;IIZ)V + * Signature: (J[BLjava/nio/ByteBuffer;II)V */ JNIEXPORT void JNICALL Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeSubmitTask(JNIEnv *, @@ -33,19 +32,17 @@ Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeSubmitTask(JNIEnv *, jbyteArray, jobject, jint, - jint, - jboolean); + jint); /* * Class: org_ray_spi_impl_DefaultLocalSchedulerClient * Method: nativeGetTask - * Signature: (JZ)[B + * Signature: (J)[B */ JNIEXPORT jbyteArray JNICALL Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeGetTask(JNIEnv *, jclass, - jlong, - jboolean); + jlong); /* * Class: org_ray_spi_impl_DefaultLocalSchedulerClient