diff --git a/streaming/java/BUILD.bazel b/streaming/java/BUILD.bazel index 98aaacf6f..a3b37f9dc 100644 --- a/streaming/java/BUILD.bazel +++ b/streaming/java/BUILD.bazel @@ -37,10 +37,6 @@ define_java_module( "//java:io_ray_ray_api", ":io_ray_ray_streaming-api", "@ray_streaming_maven//:com_google_guava_guava", - "@ray_streaming_maven//:org_mockito_mockito_all", - "@ray_streaming_maven//:org_powermock_powermock_api_mockito", - "@ray_streaming_maven//:org_powermock_powermock_module_testng", - "@ray_streaming_maven//:org_projectlombok_lombok", "@ray_streaming_maven//:org_slf4j_slf4j_api", "@ray_streaming_maven//:org_slf4j_slf4j_log4j12", "@ray_streaming_maven//:org_testng_testng", @@ -114,7 +110,6 @@ define_java_module( "@ray_streaming_maven//:org_mockito_mockito_all", "@ray_streaming_maven//:org_powermock_powermock_api_mockito", "@ray_streaming_maven//:org_powermock_powermock_module_testng", - "@ray_streaming_maven//:org_projectlombok_lombok", ], visibility = ["//visibility:public"], deps = [ @@ -127,7 +122,6 @@ define_java_module( "@ray_streaming_maven//:de_ruedigermoeller_fst", "@ray_streaming_maven//:org_aeonbits_owner_owner", "@ray_streaming_maven//:org_msgpack_msgpack_core", - "@ray_streaming_maven//:org_projectlombok_lombok", "@ray_streaming_maven//:org_slf4j_slf4j_api", "@ray_streaming_maven//:org_slf4j_slf4j_log4j12", ], @@ -147,7 +141,6 @@ java_binary( "@ray_streaming_maven//:org_mockito_mockito_all", "@ray_streaming_maven//:org_powermock_powermock_api_mockito", "@ray_streaming_maven//:org_powermock_powermock_module_testng", - "@ray_streaming_maven//:org_projectlombok_lombok", "@ray_streaming_maven//:org_testng_testng", ], ) diff --git a/streaming/java/streaming-api/pom.xml b/streaming/java/streaming-api/pom.xml index 1f032a662..9c8ff9764 100755 --- a/streaming/java/streaming-api/pom.xml +++ b/streaming/java/streaming-api/pom.xml @@ -27,26 +27,6 @@ guava 27.0.1-jre - - org.mockito - mockito-all - 1.10.19 - - - org.powermock - powermock-api-mockito - 1.6.6 - - - org.powermock - powermock-module-testng - 1.6.6 - - - org.projectlombok - lombok - 1.16.20 - org.slf4j slf4j-api diff --git a/streaming/java/streaming-api/src/main/java/io/ray/streaming/util/Config.java b/streaming/java/streaming-api/src/main/java/io/ray/streaming/util/Config.java index 789674c76..9209d5d0d 100644 --- a/streaming/java/streaming-api/src/main/java/io/ray/streaming/util/Config.java +++ b/streaming/java/streaming-api/src/main/java/io/ray/streaming/util/Config.java @@ -15,7 +15,6 @@ public class Config { public static final String STREAMING_JOB_NAME = "streaming.job.name"; public static final String STREAMING_OP_NAME = "streaming.op_name"; - public static final String TASK_JOB_ID = "streaming.task_job_id"; public static final String STREAMING_WORKER_NAME = "streaming.worker_name"; // channel diff --git a/streaming/java/streaming-runtime/pom.xml b/streaming/java/streaming-runtime/pom.xml index cefdca7cf..a38416401 100755 --- a/streaming/java/streaming-runtime/pom.xml +++ b/streaming/java/streaming-runtime/pom.xml @@ -1,5 +1,5 @@ - + @@ -76,11 +76,6 @@ powermock-module-testng 1.6.6 - - org.projectlombok - lombok - 1.16.20 - org.slf4j slf4j-api diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java index 519f58979..dc341518c 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/JobSchedulerImpl.java @@ -55,9 +55,9 @@ public class JobSchedulerImpl implements JobScheduler { } /** - * Allocating job worker resource then create job worker actor + * Allocate job workers' resource then create job workers' actor. * - * @param executionGraph + * @param executionGraph the physical plan */ protected void prepareResourceAndCreateWorker(ExecutionGraph executionGraph) { List containers = resourceManager.getRegisteredContainers(); diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java index 39aff6c36..3a43644cd 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/master/scheduler/controller/WorkerLifecycleController.java @@ -53,7 +53,7 @@ public class WorkerLifecycleController { RayActor actor = null; // TODO (datayjz): ray create actor - if (null == actor) { + if (null == actor) { LOG.error("Create worker actor failed."); return false; } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/ChannelUtils.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/ChannelUtils.java index 0ee016ef1..c5e1ffadf 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/ChannelUtils.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/ChannelUtils.java @@ -14,9 +14,6 @@ public class ChannelUtils { if (conf.containsKey(Config.STREAMING_JOB_NAME)) { builder.setJobName(conf.get(Config.STREAMING_JOB_NAME)); } - if (conf.containsKey(Config.TASK_JOB_ID)) { - builder.setTaskJobId(conf.get(Config.TASK_JOB_ID)); - } if (conf.containsKey(Config.STREAMING_WORKER_NAME)) { builder.setWorkerName(conf.get(Config.STREAMING_WORKER_NAME)); } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/StreamTask.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/StreamTask.java index e3ba8d034..7bff5101e 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/StreamTask.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/StreamTask.java @@ -49,11 +49,10 @@ public abstract class StreamTask implements Runnable { private void prepareTask() { Map queueConf = new HashMap<>(); worker.getConfig().forEach((k, v) -> queueConf.put(k, String.valueOf(v))); - String queueSize = (String) worker.getConfig() + String queueSize = worker.getConfig() .getOrDefault(Config.CHANNEL_SIZE, Config.CHANNEL_SIZE_DEFAULT); queueConf.put(Config.CHANNEL_SIZE, queueSize); - queueConf.put(Config.TASK_JOB_ID, Ray.getRuntimeContext().getCurrentJobId().toString()); - String channelType = (String) worker.getConfig() + String channelType = worker.getConfig() .getOrDefault(Config.CHANNEL_TYPE, Config.MEMORY_CHANNEL); queueConf.put(Config.CHANNEL_TYPE, channelType); diff --git a/streaming/java/test.sh b/streaming/java/test.sh index a352becba..e3225452c 100755 --- a/streaming/java/test.sh +++ b/streaming/java/test.sh @@ -23,9 +23,15 @@ bazel test //streaming/java:all --test_tag_filters="checkstyle" --build_tests_on echo "Running streaming tests." java -cp "$ROOT_DIR"/../../bazel-bin/streaming/java/all_streaming_tests_deploy.jar\ - org.testng.TestNG -d /tmp/ray_streaming_java_test_output "$ROOT_DIR"/testng.xml || exit_code=$? + org.testng.TestNG -d /tmp/ray_streaming_java_test_output "$ROOT_DIR"/testng.xml +exit_code=$? echo "Streaming TestNG results" -cat /tmp/ray_streaming_java_test_output/testng-results.xml +if [ -f "/tmp/ray_streaming_java_test_output/testng-results.xml" ] ; then + cat /tmp/ray_streaming_java_test_output/testng-results.xml +else + echo "Test result file doesn't exist" +fi + # exit_code == 2 means there are skipped tests. if [ $exit_code -ne 2 ] && [ $exit_code -ne 0 ] ; then exit $exit_code diff --git a/streaming/python/config.py b/streaming/python/config.py index 8f54463b6..d7af6230e 100644 --- a/streaming/python/config.py +++ b/streaming/python/config.py @@ -1,7 +1,6 @@ class Config: STREAMING_JOB_NAME = "streaming.job.name" STREAMING_OP_NAME = "streaming.op_name" - TASK_JOB_ID = "streaming.task_job_id" STREAMING_WORKER_NAME = "streaming.worker_name" # channel CHANNEL_TYPE = "channel_type" diff --git a/streaming/python/runtime/task.py b/streaming/python/runtime/task.py index 87dfb16b0..ee0aeb561 100644 --- a/streaming/python/runtime/task.py +++ b/streaming/python/runtime/task.py @@ -3,7 +3,6 @@ import pickle import threading from abc import ABC, abstractmethod -import ray from ray.streaming.collector import OutputCollector from ray.streaming.config import Config from ray.streaming.context import RuntimeContextImpl @@ -31,8 +30,6 @@ class StreamTask(ABC): self.worker.config.get(Config.CHANNEL_SIZE, Config.CHANNEL_SIZE_DEFAULT)) channel_conf[Config.CHANNEL_SIZE] = channel_size - channel_conf[Config.TASK_JOB_ID] = ray.runtime_context.\ - _get_runtime_context().current_driver_id channel_conf[Config.CHANNEL_TYPE] = self.worker.config \ .get(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL) diff --git a/streaming/python/runtime/transfer.py b/streaming/python/runtime/transfer.py index 9da4be35a..f40ea087a 100644 --- a/streaming/python/runtime/transfer.py +++ b/streaming/python/runtime/transfer.py @@ -147,29 +147,21 @@ class ChannelCreationParametersBuilder: wrap initial parameters needed by a streaming queue """ _java_reader_async_function_descriptor = JavaFunctionDescriptor( - "org.ray.streaming.runtime.worker", - "onReaderMessage", "([B)V") + "io.ray.streaming.runtime.worker", "onReaderMessage", "([B)V") _java_reader_sync_function_descriptor = JavaFunctionDescriptor( - "org.ray.streaming.runtime.worker", - "onReaderMessageSync", "([B)[B") + "io.ray.streaming.runtime.worker", "onReaderMessageSync", "([B)[B") _java_writer_async_function_descriptor = JavaFunctionDescriptor( - "org.ray.streaming.runtime.worker", - "onWriterMessage", "([B)V") + "io.ray.streaming.runtime.worker", "onWriterMessage", "([B)V") _java_writer_sync_function_descriptor = JavaFunctionDescriptor( - "org.ray.streaming.runtime.worker", - "onWriterMessageSync", "([B)[B") + "io.ray.streaming.runtime.worker", "onWriterMessageSync", "([B)[B") _python_reader_async_function_descriptor = PythonFunctionDescriptor( - "ray.streaming.runtime.core.worker", - "on_reader_message", "JobWorker") + "ray.streaming.runtime.worker", "on_reader_message", "JobWorker") _python_reader_sync_function_descriptor = PythonFunctionDescriptor( - "ray.streaming.runtime.core.worker", - "on_reader_message_sync", "JobWorker") + "ray.streaming.runtime.worker", "on_reader_message_sync", "JobWorker") _python_writer_async_function_descriptor = PythonFunctionDescriptor( - "ray.streaming.runtime.core.worker", - "on_writer_message", "JobWorker") + "ray.streaming.runtime.worker", "on_writer_message", "JobWorker") _python_writer_sync_function_descriptor = PythonFunctionDescriptor( - "ray.streaming.runtime.core.worker", - "on_writer_message_sync", "JobWorker") + "ray.streaming.runtime.worker", "on_writer_message_sync", "JobWorker") def get_parameters(self): return self._parameters @@ -193,8 +185,8 @@ class ChannelCreationParametersBuilder: self._python_reader_sync_function_descriptor) return self - def build_parameters(self, actors, java_async_func, - java_sync_func, py_async_func, py_sync_func): + def build_parameters(self, actors, java_async_func, java_sync_func, + py_async_func, py_sync_func): for handle in actors: parameter = None if handle._ray_actor_language == Language.PYTHON: @@ -208,16 +200,16 @@ class ChannelCreationParametersBuilder: @staticmethod def set_python_writer_function_descriptor(async_function, sync_function): - ChannelCreationParametersBuilder.\ + ChannelCreationParametersBuilder. \ _python_writer_async_function_descriptor = async_function - ChannelCreationParametersBuilder.\ + ChannelCreationParametersBuilder. \ _python_writer_sync_function_descriptor = sync_function @staticmethod def set_python_reader_function_descriptor(async_function, sync_function): - ChannelCreationParametersBuilder.\ + ChannelCreationParametersBuilder. \ _python_reader_async_function_descriptor = async_function - ChannelCreationParametersBuilder.\ + ChannelCreationParametersBuilder. \ _python_reader_sync_function_descriptor = sync_function @@ -251,8 +243,7 @@ class DataWriter: is_mock = conf[Config.CHANNEL_TYPE] == Config.MEMORY_CHANNEL self.writer = _streaming.DataWriter.create( py_output_channels, creation_parameters.get_parameters(), - channel_size, py_msg_ids, - config_bytes, is_mock) + channel_size, py_msg_ids, config_bytes, is_mock) logger.info("create DataWriter succeed") @@ -307,8 +298,8 @@ class DataReader: is_mock = conf[Config.CHANNEL_TYPE] == Config.MEMORY_CHANNEL self.reader = _streaming.DataReader.create( py_input_channels, creation_parameters.get_parameters(), - py_seq_ids, py_msg_ids, - timer_interval, is_recreate, config_bytes, is_mock) + py_seq_ids, py_msg_ids, timer_interval, is_recreate, config_bytes, + is_mock) logger.info("create DataReader succeed") def read(self, timeout_millis): @@ -344,9 +335,6 @@ def _to_native_conf(conf): config = streaming_pb.StreamingConfig() if Config.STREAMING_JOB_NAME in conf: config.job_name = conf[Config.STREAMING_JOB_NAME] - if Config.TASK_JOB_ID in conf: - job_id = conf[Config.TASK_JOB_ID] - config.task_job_id = job_id.hex() if Config.STREAMING_WORKER_NAME in conf: config.worker_name = conf[Config.STREAMING_WORKER_NAME] if Config.STREAMING_OP_NAME in conf: diff --git a/streaming/python/tests/test_direct_transfer.py b/streaming/python/tests/test_direct_transfer.py index 12d311528..309d6f054 100644 --- a/streaming/python/tests/test_direct_transfer.py +++ b/streaming/python/tests/test_direct_transfer.py @@ -19,10 +19,7 @@ class Worker: self.reader = None def init_writer(self, output_channel, reader_actor): - conf = { - Config.TASK_JOB_ID: ray.worker.global_worker.current_job_id, - Config.CHANNEL_TYPE: Config.NATIVE_CHANNEL - } + conf = {Config.CHANNEL_TYPE: Config.NATIVE_CHANNEL} reader_async_func = PythonFunctionDescriptor( __name__, self.on_reader_message.__name__, self.__class__.__name__) reader_sync_func = PythonFunctionDescriptor( @@ -36,10 +33,7 @@ class Worker: self.output_channel_id = transfer.ChannelID(output_channel) def init_reader(self, input_channel, writer_actor): - conf = { - Config.TASK_JOB_ID: ray.worker.global_worker.current_job_id, - Config.CHANNEL_TYPE: Config.NATIVE_CHANNEL - } + conf = {Config.CHANNEL_TYPE: Config.NATIVE_CHANNEL} writer_async_func = PythonFunctionDescriptor( __name__, self.on_writer_message.__name__, self.__class__.__name__) writer_sync_func = PythonFunctionDescriptor( diff --git a/streaming/python/tests/test_word_count.py b/streaming/python/tests/test_word_count.py index 5758b9336..d86595cf4 100644 --- a/streaming/python/tests/test_word_count.py +++ b/streaming/python/tests/test_word_count.py @@ -1,3 +1,4 @@ +import os import ray from ray.streaming import StreamingContext @@ -21,5 +22,37 @@ def test_word_count(): ray.shutdown() +def test_simple_word_count(): + ray.init(load_code_from_local=True, include_java=True) + ctx = StreamingContext.Builder() \ + .build() + sink_file = "/tmp/ray_streaming_test_simple_word_count.txt" + if os.path.exists(sink_file): + os.remove(sink_file) + + def sink_func(x): + with open(sink_file, "a") as f: + f.write("{}:{},".format(x[0], x[1])) + + ctx.from_values("a", "b", "c") \ + .set_parallelism(1) \ + .flat_map(lambda x: [x, x]) \ + .map(lambda x: (x, 1)) \ + .key_by(lambda x: x[0]) \ + .reduce(lambda old_value, new_value: + (old_value[0], old_value[1] + new_value[1])) \ + .sink(sink_func) + ctx.submit("word_count") + import time + time.sleep(3) + ray.shutdown() + with open(sink_file, "r") as f: + result = f.read() + assert "a:2" in result + assert "b:2" in result + assert "c:2" in result + + if __name__ == "__main__": - test_word_count() + # test_word_count() + test_simple_word_count() diff --git a/streaming/src/config/streaming_config.cc b/streaming/src/config/streaming_config.cc index c668c9b79..90ad2e62c 100644 --- a/streaming/src/config/streaming_config.cc +++ b/streaming/src/config/streaming_config.cc @@ -18,10 +18,6 @@ void StreamingConfig::FromProto(const uint8_t *data, uint32_t size) { if (!config.job_name().empty()) { SetJobName(config.job_name()); } - if (!config.task_job_id().empty()) { - STREAMING_CHECK(config.task_job_id().size() == 2 * JobID::Size()); - SetTaskJobId(config.task_job_id()); - } if (!config.worker_name().empty()) { SetWorkerName(config.worker_name()); } diff --git a/streaming/src/config/streaming_config.h b/streaming/src/config/streaming_config.h index 474d7571e..ba4a77228 100644 --- a/streaming/src/config/streaming_config.h +++ b/streaming/src/config/streaming_config.h @@ -30,8 +30,6 @@ class StreamingConfig { std::string worker_name_ = "DEFAULT_WORKER_NAME"; - std::string task_job_id_ = JobID::Nil().Hex(); - // Default flow control type is unconsumed sequence flow control. More detail // introducation and implemention in ray/streaming/src/flow_control.h. streaming::proto::FlowControlType flow_control_type_ = @@ -50,7 +48,6 @@ class StreamingConfig { TYPE Get##NAME() const { return VALUE; } \ void Set##NAME(TYPE value) { VALUE = value; } - DECL_GET_SET_PROPERTY(const std::string &, TaskJobId, task_job_id_) DECL_GET_SET_PROPERTY(const std::string &, WorkerName, worker_name_) DECL_GET_SET_PROPERTY(const std::string &, OpName, op_name_) DECL_GET_SET_PROPERTY(uint32_t, EmptyMessageTimeInterval, empty_message_time_interval_) diff --git a/streaming/src/data_writer.cc b/streaming/src/data_writer.cc index a7578f219..af840c19a 100644 --- a/streaming/src/data_writer.cc +++ b/streaming/src/data_writer.cc @@ -134,12 +134,7 @@ StreamingStatus DataWriter::Init(const std::vector &queue_id_vec, const std::vector &channel_message_id_vec, const std::vector &queue_size_vec) { STREAMING_CHECK(!queue_id_vec.empty() && !channel_message_id_vec.empty()); - - ray::JobID job_id = - JobID::FromBinary(Util::Hexqid2str(runtime_context_->GetConfig().GetTaskJobId())); - - STREAMING_LOG(INFO) << "Job name => " << runtime_context_->GetConfig().GetJobName() - << ", job id => " << job_id; + STREAMING_LOG(INFO) << "Job name => " << runtime_context_->GetConfig().GetJobName(); output_queue_ids_ = queue_id_vec; transfer_config_->Set(ConfigEnum::QUEUE_ID_VECTOR, queue_id_vec); diff --git a/streaming/src/protobuf/streaming.proto b/streaming/src/protobuf/streaming.proto index 581861c10..e79143556 100644 --- a/streaming/src/protobuf/streaming.proto +++ b/streaming/src/protobuf/streaming.proto @@ -29,7 +29,6 @@ enum FlowControlType { // all string in this message is ASCII string message StreamingConfig { string job_name = 1; - string task_job_id = 2; string worker_name = 3; string op_name = 4; NodeType role = 5;