diff --git a/.travis.yml b/.travis.yml index 845b63f64..54b8999b6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -141,6 +141,7 @@ matrix: - RAY_INSTALL_JAVA=1 - PYTHON=3.6 PYTHONWARNINGS=ignore - RAY_USE_RANDOM_PORTS=1 + - RAY_ENABLE_NEW_SCHEDULER=0 language: java jdk: openjdk8 install: diff --git a/python/ray/worker.py b/python/ray/worker.py index 888cf680b..4086831ea 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -771,6 +771,8 @@ def init( driver_object_store_memory=_driver_object_store_memory, job_id=None, job_config=job_config) + if job_config and job_config.code_search_path: + global_worker.set_load_code_from_local(True) for hook in _post_init_hooks: hook() diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/client/JobClientImpl.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/client/JobClientImpl.java index 4e27fdff5..68ca33ec1 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/client/JobClientImpl.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/client/JobClientImpl.java @@ -44,9 +44,12 @@ public class JobClientImpl implements JobClient { if (submitResult.get()) { LOG.info("Finish submitting job: {}.", jobGraph.getJobName()); + } else { + throw new RuntimeException("submitting job failed"); } } catch (Exception e) { LOG.error("Failed to submit job: {}.", jobGraph.getJobName(), e); + throw new RuntimeException("submitting job failed", e); } } } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java index 2106b4d01..a1dd5b6bc 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/JobMaster.java @@ -157,7 +157,8 @@ public class JobMaster { scheduler = new JobSchedulerImpl(this); scheduler.scheduleJob(graphManager.getExecutionGraph()); } catch (Exception e) { - LOG.error("Failed to submit job.", e); + e.printStackTrace(); + LOG.error("Failed to submit job {}.", e, e); return false; } return true; diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java index 05fdb2d2c..6309bb334 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java @@ -1,5 +1,6 @@ package io.ray.streaming.runtime.master.scheduler; +import com.google.common.base.Preconditions; import io.ray.api.ActorHandle; import io.ray.streaming.runtime.config.StreamingConfig; import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; @@ -82,7 +83,7 @@ public class JobSchedulerImpl implements JobScheduler { Map vertexToContextMap = buildWorkersContext(executionGraph); // init workers - initWorkers(vertexToContextMap); + Preconditions.checkState(initWorkers(vertexToContextMap)); // init master initMaster(); @@ -119,17 +120,13 @@ public class JobSchedulerImpl implements JobScheduler { * @param vertexToContextMap vertex - context map */ protected boolean initWorkers(Map vertexToContextMap) { - boolean result; - try { - result = - workerLifecycleController.initWorkers( - vertexToContextMap, - jobConfig.masterConfig.schedulerConfig.workerInitiationWaitTimeoutMs()); - } catch (Exception e) { - LOG.error("Failed to initiate workers.", e); - return false; + boolean succeed; + int timeoutMs = jobConfig.masterConfig.schedulerConfig.workerInitiationWaitTimeoutMs(); + succeed = workerLifecycleController.initWorkers(vertexToContextMap, timeoutMs); + if (!succeed) { + LOG.error("Failed to initiate workers in {} milliseconds", timeoutMs); } - return result; + return succeed; } /** Start JobWorkers according to the physical plan. */ diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/python/PythonGateway.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/python/PythonGateway.java index fd59a0ffd..4656d98ce 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/python/PythonGateway.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/python/PythonGateway.java @@ -228,7 +228,7 @@ public class PythonGateway { public byte[] newInstance(byte[] classNameBytes) { String className = (String) serializer.deserialize(classNameBytes); try { - Class clz = Class.forName(className, true, this.getClass().getClassLoader()); + Class clz = Class.forName(className, true, Thread.currentThread().getContextClassLoader()); Object instance = clz.newInstance(); referenceMap.put(getReferenceId(instance), instance); return serializer.serialize(getReferenceId(instance)); diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataReader.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataReader.java index 9e8f6318c..17ab4fe1e 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataReader.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/DataReader.java @@ -246,8 +246,8 @@ public class DataReader { // kMessageBundleHeaderSize + kUniqueIDSize: // magicNum(4b) + bundleTs(8b) + lastMessageId(8b) + messageListSize(4b) - // + bundleType(4b) + rawBundleSize(4b) + channelID(20b) - static final int LENGTH = 4 + 8 + 8 + 4 + 4 + 4 + 20; + // + bundleType(4b) + rawBundleSize(4b) + channelID + static final int LENGTH = 4 + 8 + 8 + 4 + 4 + 4 + ChannelId.ID_LENGTH; private int magicNum; private long bundleTs; private long lastMessageId; diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/channel/ChannelId.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/channel/ChannelId.java index 3c4f8caf0..d3a4b8d71 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/channel/ChannelId.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/channel/ChannelId.java @@ -5,6 +5,7 @@ import com.google.common.base.FinalizableReferenceQueue; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.io.BaseEncoding; +import io.ray.api.id.ObjectId; import java.lang.ref.Reference; import java.nio.ByteBuffer; import java.util.Random; @@ -16,7 +17,7 @@ import sun.nio.ch.DirectBuffer; */ public class ChannelId { - public static final int ID_LENGTH = 20; + public static final int ID_LENGTH = ObjectId.LENGTH; private static final FinalizableReferenceQueue REFERENCE_QUEUE = new FinalizableReferenceQueue(); // This ensures that the FinalizablePhantomReference itself is not garbage-collected. private static final Set> references = Sets.newConcurrentHashSet(); @@ -82,15 +83,15 @@ public class ChannelId { } /** - * Generate channel name, which will be 20 character + * Generate channel name, which will be {@link ChannelId#ID_LENGTH} character * * @param fromTaskId upstream task id * @param toTaskId downstream task id Returns channel name */ public static String genIdStr(int fromTaskId, int toTaskId, long ts) { /* - | Head | Timestamp | Empty | From | To | - | 8 bytes | 4bytes | 4bytes| 2bytes| 2bytes | + | Head | Timestamp | Empty | From | To | padding | + | 8 bytes | 4bytes | 4bytes| 2bytes| 2bytes | | */ Preconditions.checkArgument( fromTaskId < Short.MAX_VALUE, @@ -99,7 +100,7 @@ public class ChannelId { Short.MAX_VALUE); Preconditions.checkArgument( toTaskId < Short.MAX_VALUE, "toTaskId %s is larger than %s", fromTaskId, Short.MAX_VALUE); - byte[] channelName = new byte[20]; + byte[] channelName = new byte[ID_LENGTH]; for (int i = 11; i >= 8; i--) { channelName[i] = (byte) (ts & 0xff); diff --git a/streaming/python/includes/transfer.pxi b/streaming/python/includes/transfer.pxi index 4952fd8b5..d7beef5e9 100644 --- a/streaming/python/includes/transfer.pxi +++ b/streaming/python/includes/transfer.pxi @@ -385,7 +385,7 @@ cdef c_vector[CObjectID] bytes_list_to_qid_vec(list py_queue_ids) except *: c_string q_id_data for q_id in py_queue_ids: q_id_data = q_id - assert q_id_data.size() == CObjectID.Size() + assert q_id_data.size() == CObjectID.Size(), f"{q_id_data.size()}, {CObjectID.Size()}" obj_id = CObjectID.FromBinary(q_id_data) queue_id_vec.push_back(obj_id) return queue_id_vec diff --git a/streaming/python/runtime/transfer.py b/streaming/python/runtime/transfer.py index 8091c1d21..4cb482de5 100644 --- a/streaming/python/runtime/transfer.py +++ b/streaming/python/runtime/transfer.py @@ -14,7 +14,7 @@ from ray._raylet import JavaFunctionDescriptor from ray._raylet import PythonFunctionDescriptor from ray._raylet import Language -CHANNEL_ID_LEN = 20 +CHANNEL_ID_LEN = ray.ObjectID.nil().size() logger = logging.getLogger(__name__) @@ -58,8 +58,8 @@ class ChannelID: @staticmethod def gen_id(from_index, to_index, ts): - """Generate channel id, which is 20 character""" - channel_id = bytearray(20) + """Generate channel id, which is `CHANNEL_ID_LEN` character""" + channel_id = bytearray(CHANNEL_ID_LEN) for i in range(11, 7, -1): channel_id[i] = ts & 0xff ts >>= 8 diff --git a/streaming/python/tests/test_hybrid_stream.py b/streaming/python/tests/test_hybrid_stream.py index 1a18b3fb1..cc1aa48b3 100644 --- a/streaming/python/tests/test_hybrid_stream.py +++ b/streaming/python/tests/test_hybrid_stream.py @@ -1,7 +1,5 @@ -import json import os import subprocess -import sys import ray from ray.streaming import StreamingContext @@ -31,12 +29,8 @@ def test_hybrid_stream(): "../../../bazel-bin/streaming/java/all_streaming_tests_deploy.jar") jar_path = os.path.abspath(jar_path) print("jar_path", jar_path) - java_worker_options = json.dumps(["-classpath", jar_path]) - print("java_worker_options", java_worker_options) assert not ray.is_initialized() - ray.init( - job_config=ray.job_config.JobConfig(code_search_path=sys.path), - _java_worker_options=java_worker_options) + ray.init(job_config=ray.job_config.JobConfig(code_search_path=[jar_path])) sink_file = "/tmp/ray_streaming_test_hybrid_stream.txt" if os.path.exists(sink_file):