[Java] Remove non-raylet code in Java. (#2828)

This commit is contained in:
Wang Qing
2018-09-06 14:54:13 +08:00
committed by Hao Chen
parent d81605e9e7
commit 7e13e1fd49
23 changed files with 159 additions and 785 deletions
@@ -129,12 +129,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
functions = new LocalFunctionManager(remoteLoader);
localSchedulerClient = slink;
if (!params.use_raylet) {
objectStoreProxy = new ObjectStoreProxy(plink);
} else {
objectStoreProxy = new ObjectStoreProxy(plink, slink);
}
objectStoreProxy = new ObjectStoreProxy(plink);
worker = new Worker(this);
}
@@ -203,9 +198,6 @@ public abstract class AbstractRayRuntime implements RayRuntime {
public <T> void put(UniqueId objectId, T obj) {
UniqueId taskId = WorkerContext.currentTask().taskId;
RayLog.core.info("Putting object {}, for task {} ", objectId, taskId);
if (!params.use_raylet) {
localSchedulerClient.markTaskPutDependency(taskId, objectId);
}
objectStoreProxy.put(objectId, obj, null);
}
@@ -227,11 +219,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
List<List<UniqueId>> fetchBatches =
splitIntoBatches(objectIds, params.worker_fetch_request_size);
for (List<UniqueId> batch : fetchBatches) {
if (!params.use_raylet) {
objectStoreProxy.fetch(batch);
} else {
localSchedulerClient.reconstructObjects(batch, true);
}
localSchedulerClient.reconstructObjects(batch, true);
}
// Get the objects. We initially try to get the objects immediately.
@@ -256,16 +244,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
splitIntoBatches(unreadyList, params.worker_fetch_request_size);
for (List<UniqueId> batch : reconstructBatches) {
if (!params.use_raylet) {
for (UniqueId objectId : batch) {
localSchedulerClient.reconstructObject(objectId, false);
}
// Do another fetch for objects that aren't available locally yet, in case
// they were evicted since the last fetch.
objectStoreProxy.fetch(batch);
} else {
localSchedulerClient.reconstructObjects(batch, false);
}
localSchedulerClient.reconstructObjects(batch, false);
}
List<Pair<T, GetStatus>> results = objectStoreProxy
@@ -328,7 +307,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
@Override
public <T> WaitResult<T> wait(List<RayObject<T>> waitList, int numReturns, int timeoutMs) {
return objectStoreProxy.wait(waitList, numReturns, timeoutMs);
return localSchedulerClient.wait(waitList, numReturns, timeoutMs);
}
@Override
@@ -28,21 +28,6 @@ public class RayParameters {
@AConfig(comment = "object store rpc listen port")
public int object_store_rpc_port = 32567;
@AConfig(comment = "object store manager name (e.g., /tmp/storeMgr1111")
public String object_store_manager_name = "";
@AConfig(comment = "object store manager rpc listen port")
public int object_store_manager_rpc_port = 33567;
@AConfig(comment = "object store manager ray listen port")
public int object_store_manager_ray_listen_port = 33667;
@AConfig(comment = "local scheduler name (e.g., /tmp/scheduler1111")
public String local_scheduler_name = "";
@AConfig(comment = "local scheduler rpc listen port")
public int local_scheduler_rpc_port = 34567;
@AConfig(comment = "driver ID when the worker is served as a driver")
public UniqueId driver_id = UniqueId.NIL;
@@ -58,30 +43,15 @@ public class RayParameters {
@AConfig(comment = "redirect err and stdout to files for newly created processes")
public boolean redirect = true;
@AConfig(comment = "whether to start the global scheduler")
public boolean include_global_scheduler = false;
@AConfig(comment = "whether to start redis shard server in addition to the primary server")
public boolean start_redis_shards = false;
@AConfig(comment = "whether to clean up the processes when there is a process start failure")
public boolean cleanup = false;
@AConfig(comment = "whether to start workers from within the local schedulers")
public boolean start_workers_from_local_scheduler = true;
@AConfig(comment = "number of cpus assigned to each local scheduler")
public int[] num_cpus = {};
@AConfig(comment = "number of gpus assigned to each local scheduler")
public int[] num_gpus = {};
@AConfig(comment = "number of redis shard servers to be started")
public int num_redis_shards = 0;
@AConfig(comment = "number of local schedulers to be started")
public int num_local_schedulers = 1;
@AConfig(comment = "whether this is a deployment in cluster")
public boolean deploy = false;
@@ -112,9 +82,6 @@ public class RayParameters {
@AConfig(comment = "delay seconds under onebox before app logic for debugging")
public int onebox_delay_seconds_before_run_app_logic = 0;
@AConfig(comment = "whether to use raylet")
public boolean use_raylet = false;
@AConfig(comment = "raylet socket name (e.g., /tmp/raylet1111")
public String raylet_socket_name = "";
@@ -1,6 +1,9 @@
package org.ray.spi;
import java.util.List;
import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.id.UniqueId;
import org.ray.spi.model.TaskSpec;
@@ -13,17 +16,13 @@ public interface LocalSchedulerLink {
TaskSpec getTask();
void markTaskPutDependency(UniqueId taskId, UniqueId objectId);
void reconstructObject(UniqueId objectId, boolean fetchOnly);
void reconstructObjects(List<UniqueId> objectIds, boolean fetchOnly);
void notifyUnblocked();
UniqueId generateTaskId(UniqueId driverId, UniqueId parentTaskId, int taskIndex);
List<byte[]> wait(byte[][] objectIds, int timeoutMs, int numReturns);
<T> WaitResult<T> wait(List<RayObject<T>> waitFor, int numReturns, int timeoutMs);
void freePlasmaObjects(List<UniqueId> objectIds, boolean localOnly);
}
@@ -18,17 +18,10 @@ import org.ray.util.exception.TaskExecutionException;
public class ObjectStoreProxy {
private final ObjectStoreLink store;
private final LocalSchedulerLink localSchedulerLink;
private final int getTimeoutMs = 1000;
public ObjectStoreProxy(ObjectStoreLink store) {
this.store = store;
this.localSchedulerLink = null;
}
public ObjectStoreProxy(ObjectStoreLink store, LocalSchedulerLink localSchedulerLink) {
this.store = store;
this.localSchedulerLink = localSchedulerLink;
}
public <T> Pair<T, GetStatus> get(UniqueId objectId, boolean isMetadata)
@@ -89,39 +82,6 @@ public class ObjectStoreProxy {
store.put(id.getBytes(), Serializer.encode(obj), Serializer.encode(metadata));
}
public <T> WaitResult<T> wait(List<RayObject<T>> waitfor, int numReturns, int timeout) {
List<UniqueId> ids = new ArrayList<>();
for (RayObject<T> obj : waitfor) {
ids.add(obj.getId());
}
List<byte[]> readys;
if (localSchedulerLink == null) {
readys = store.wait(getIdBytes(ids), timeout, numReturns);
} else {
readys = localSchedulerLink.wait(getIdBytes(ids), timeout, numReturns);
}
List<RayObject<T>> readyList = new ArrayList<>();
List<RayObject<T>> unreadyList = new ArrayList<>();
for (RayObject<T> obj : waitfor) {
if (readys.contains(obj.getId().getBytes())) {
readyList.add(obj);
} else {
unreadyList.add(obj);
}
}
return new WaitResult<>(readyList, unreadyList);
}
public void fetch(List<UniqueId> objectIds) {
if (localSchedulerLink == null) {
store.fetch(getIdBytes(objectIds));
} else {
localSchedulerLink.reconstructObjects(objectIds, true);
}
}
public enum GetStatus {
SUCCESS, FAILED
}
@@ -28,15 +28,6 @@ public class PathConfig {
@AConfig(comment = "path to plasma storage")
public String store;
@AConfig(comment = "path to plasma manager")
public String store_manager;
@AConfig(comment = "path to local scheduler")
public String local_scheduler;
@AConfig(comment = "path to global scheduler")
public String global_scheduler;
@AConfig(comment = "path to raylet")
public String raylet;
@@ -68,6 +68,14 @@ public class TaskSpec {
this.createActorId = createActorId;
this.resources = resources;
this.cursorId = cursorId;
if (!this.resources.containsKey(ResourceUtil.CPU_LITERAL)) {
this.resources.put(ResourceUtil.CPU_LITERAL, 0.0);
}
if (!this.resources.containsKey(ResourceUtil.GPU_LITERAL)) {
this.resources.put(ResourceUtil.GPU_LITERAL, 0.0);
}
}
@Override