diff --git a/BUILD.bazel b/BUILD.bazel index a9f389fe7..4502adadb 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -367,6 +367,29 @@ cc_library( ], ) +# This header is used to warp some streaming code so we can reduce suspicious +# symbols export. +cc_library( + name = "exported_streaming_internal", + srcs = glob( + [ + "src/ray/streaming/streaming.cc", + ], + ), + hdrs = glob( + [ + "src/ray/streaming/streaming.h", + ], + ), + copts = COPTS, + strip_include_prefix = "src", + visibility = ["//visibility:public"], + deps = [ + ":core_worker_lib", + ], + alwayslink = 1, +) + cc_binary( name = "raylet", srcs = ["src/ray/raylet/main.cc"], @@ -1702,6 +1725,7 @@ pyx_library( ), deps = [ "//:core_worker_lib", + "//:exported_streaming_internal", "//:global_state_accessor_lib", "//:ray_util", "//:raylet_lib", @@ -1756,6 +1780,7 @@ cc_binary( visibility = ["//java:__subpackages__"], deps = [ "//:core_worker_lib", + "//:exported_streaming_internal", "//:global_state_accessor_lib", "//:src/ray/ray_exported_symbols.lds", "//:src/ray/ray_version_script.lds", diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 53be38a4c..6506ca8ed 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -75,7 +75,8 @@ class BaseID { protected: BaseID(const std::string &binary) { RAY_CHECK(binary.size() == Size() || binary.size() == 0) - << "expected size is " << Size() << ", but got " << binary.size(); + << "expected size is " << Size() << ", but got data " << binary << " of size " + << binary.size(); std::memcpy(const_cast(this->Data()), binary.data(), binary.size()); } // All IDs are immutable for hash evaluations. MutableData is only allow to use @@ -341,24 +342,25 @@ std::ostream &operator<<(std::ostream &os, const TaskID &id); std::ostream &operator<<(std::ostream &os, const ObjectID &id); std::ostream &operator<<(std::ostream &os, const PlacementGroupID &id); -#define DEFINE_UNIQUE_ID(type) \ - class RAY_EXPORT type : public UniqueID { \ - public: \ - explicit type(const UniqueID &from) { \ - std::memcpy(&id_, from.Data(), kUniqueIDSize); \ - } \ - type() : UniqueID() {} \ - static type FromRandom() { return type(UniqueID::FromRandom()); } \ - static type FromBinary(const std::string &binary) { return type(binary); } \ - static type Nil() { return type(UniqueID::Nil()); } \ - static size_t Size() { return kUniqueIDSize; } \ - \ - private: \ - explicit type(const std::string &binary) { \ - RAY_CHECK(binary.size() == Size() || binary.size() == 0) \ - << "expected size is " << Size() << ", but got " << binary.size(); \ - std::memcpy(&id_, binary.data(), binary.size()); \ - } \ +#define DEFINE_UNIQUE_ID(type) \ + class RAY_EXPORT type : public UniqueID { \ + public: \ + explicit type(const UniqueID &from) { \ + std::memcpy(&id_, from.Data(), kUniqueIDSize); \ + } \ + type() : UniqueID() {} \ + static type FromRandom() { return type(UniqueID::FromRandom()); } \ + static type FromBinary(const std::string &binary) { return type(binary); } \ + static type Nil() { return type(UniqueID::Nil()); } \ + static size_t Size() { return kUniqueIDSize; } \ + \ + private: \ + explicit type(const std::string &binary) { \ + RAY_CHECK(binary.size() == Size() || binary.size() == 0) \ + << "expected size is " << Size() << ", but got data " << binary << " of size " \ + << binary.size(); \ + std::memcpy(&id_, binary.data(), binary.size()); \ + } \ }; #include "ray/common/id_def.h" @@ -385,7 +387,8 @@ T BaseID::FromRandom() { template T BaseID::FromBinary(const std::string &binary) { RAY_CHECK(binary.size() == T::Size() || binary.size() == 0) - << "expected size is " << T::Size() << ", but got " << binary.size(); + << "expected size is " << T::Size() << ", but got data " << binary << " of size " + << binary.size(); T t; std::memcpy(t.MutableData(), binary.data(), binary.size()); return t; diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 8f880e077..8f0581b65 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -120,7 +120,7 @@ ObjectID TaskSpecification::ReturnId(size_t return_index) const { } bool TaskSpecification::ArgByRef(size_t arg_index) const { - return message_->args(arg_index).object_ref().object_id() != ""; + return message_->args(arg_index).has_object_ref(); } ObjectID TaskSpecification::ArgId(size_t arg_index) const { diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 195e5c8dd..4a1093991 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -41,7 +41,7 @@ class TaskArgByValue : public TaskArg { /// /// \param[in] value Value of the argument. /// \return The task argument. - TaskArgByValue(const std::shared_ptr &value) : value_(value) { + explicit TaskArgByValue(const std::shared_ptr &value) : value_(value) { RAY_CHECK(value) << "Value can't be null."; } diff --git a/src/ray/core_worker/transport/dependency_resolver.cc b/src/ray/core_worker/transport/dependency_resolver.cc index e3d480d43..247bd648f 100644 --- a/src/ray/core_worker/transport/dependency_resolver.cc +++ b/src/ray/core_worker/transport/dependency_resolver.cc @@ -46,7 +46,7 @@ void InlineDependencies( if (!it->second->IsInPlasmaError()) { // The object has not been promoted to plasma. Inline the object by // clearing the reference and replacing it with the raw value. - mutable_arg->mutable_object_ref()->Clear(); + mutable_arg->clear_object_ref(); if (it->second->HasData()) { const auto &data = it->second->GetData(); mutable_arg->set_data(data->Data(), data->Size()); diff --git a/src/ray/ray_exported_symbols.lds b/src/ray/ray_exported_symbols.lds index 446542103..d67cb8b9c 100644 --- a/src/ray/ray_exported_symbols.lds +++ b/src/ray/ray_exported_symbols.lds @@ -21,9 +21,11 @@ *ray*TaskID* *ray*ActorID* *ray*ObjectID* +*ray*ObjectReference* # Others *ray*CoreWorker* *PyInit* *init_raylet* *Java* *JNI_* +*ray*streaming* diff --git a/src/ray/ray_version_script.lds b/src/ray/ray_version_script.lds index c778015f5..c53eb528f 100644 --- a/src/ray/ray_version_script.lds +++ b/src/ray/ray_version_script.lds @@ -23,11 +23,13 @@ VERSION_1.0 { *ray*TaskID*; *ray*ActorID*; *ray*ObjectID*; + *ray*ObjectReference*; # Others *ray*CoreWorker*; *PyInit*; *init_raylet*; *Java*; *JNI_*; + *ray*streaming*; local: *; }; diff --git a/src/ray/streaming/streaming.cc b/src/ray/streaming/streaming.cc new file mode 100644 index 000000000..0a53fd8bc --- /dev/null +++ b/src/ray/streaming/streaming.cc @@ -0,0 +1,34 @@ +#include "ray/streaming/streaming.h" +#include "ray/core_worker/core_worker.h" + +namespace ray { +namespace streaming { + +void SendInternal(const ActorID &peer_actor_id, std::shared_ptr buffer, + RayFunction &function, int return_num, + std::vector &return_ids) { + std::unordered_map resources; + std::string name = function.GetFunctionDescriptor()->DefaultTaskName(); + TaskOptions options{name, return_num, resources}; + + char meta_data[3] = {'R', 'A', 'W'}; + std::shared_ptr meta = + std::make_shared((uint8_t *)meta_data, 3, true); + + std::vector> args; + if (function.GetLanguage() == Language::PYTHON) { + auto dummy = "__RAY_DUMMY__"; + std::shared_ptr dummyBuffer = + std::make_shared((uint8_t *)dummy, 13, true); + args.emplace_back(new TaskArgByValue(std::make_shared( + std::move(dummyBuffer), meta, std::vector(), true))); + } + args.emplace_back(new TaskArgByValue(std::make_shared( + std::move(buffer), meta, std::vector(), true))); + + std::vector> results; + CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id, function, args, + options, &return_ids); +} +} // namespace streaming +} // namespace ray diff --git a/src/ray/streaming/streaming.h b/src/ray/streaming/streaming.h new file mode 100644 index 000000000..986fc3ca0 --- /dev/null +++ b/src/ray/streaming/streaming.h @@ -0,0 +1,21 @@ +#pragma once +#include "ray/common/buffer.h" +#include "ray/common/id.h" +#include "ray/core_worker/common.h" + +// This header is used to warp some streaming code so we can reduce suspicious +// symbols export. +namespace ray { +namespace streaming { + +/// Send buffer internal +/// \param[in] buffer buffer to be sent. +/// \param[in] function the function descriptor of peer's function. +/// \param[in] return_num return value number of the call. +/// \param[out] return_ids return ids from SubmitActorTask. +void SendInternal(const ActorID &peer_actor_id, std::shared_ptr buffer, + RayFunction &function, int return_num, + std::vector &return_ids); + +} // namespace streaming +} // namespace ray \ No newline at end of file diff --git a/streaming/BUILD.bazel b/streaming/BUILD.bazel index 6bb71ae98..175643069 100644 --- a/streaming/BUILD.bazel +++ b/streaming/BUILD.bazel @@ -67,6 +67,13 @@ cc_binary( deps = ["//:core_worker_lib"], ) +cc_binary( + name = "exported_streaming_internal.so", + copts = COPTS, + linkshared = 1, + deps = ["//:exported_streaming_internal"], +) + cc_library( name = "streaming_util", srcs = glob([ @@ -143,9 +150,11 @@ cc_library( "@bazel_tools//src/conditions:windows": [ # TODO(mehrdadn): This is to resolve symbols on Windows for now. Should remove this later. (See d7f8d18.) "//:core_worker_lib", + "//:exported_streaming_internal", ], "//conditions:default": [ "core_worker_lib.so", + "exported_streaming_internal.so", ], }), ) @@ -233,6 +242,7 @@ cc_library( ) test_common_deps = [ + "//:exported_streaming_internal", ":streaming_lib", "//:ray_common", "//:ray_util", @@ -348,6 +358,7 @@ genrule( GENERATED_DIR="streaming/python/generated" mkdir -p "$$GENERATED_DIR" touch "$$GENERATED_DIR/__init__.py" + # Use this `sed` command to change the import path in the generated file. sed -i -E 's/from streaming.src.protobuf/from ./' "$$GENERATED_DIR/remote_call_pb2.py" sed -i -E 's/from protobuf/from ./' "$$GENERATED_DIR/remote_call_pb2.py" date > $@ diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java index 2238e82aa..29aebdc29 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java @@ -5,6 +5,8 @@ import io.ray.runtime.util.JniUtils; import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,4 +39,25 @@ public class EnvUtil { JniUtils.loadLibrary("streaming_java"); } + /** + * Execute an external command. + * + * @return Whether the command succeeded. + */ + public static boolean executeCommand(List command, int waitTimeoutSeconds) { + try { + ProcessBuilder processBuilder = new ProcessBuilder(command) + .redirectOutput(ProcessBuilder.Redirect.INHERIT) + .redirectError(ProcessBuilder.Redirect.INHERIT); + Process process = processBuilder.start(); + boolean exit = process.waitFor(waitTimeoutSeconds, TimeUnit.SECONDS); + if (!exit) { + process.destroyForcibly(); + } + return process.exitValue() == 0; + } catch (Exception e) { + throw new RuntimeException("Error executing command " + String.join(" ", command), e); + } + } + } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/ResourceUtil.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/ResourceUtil.java index 66777b702..9b8ca22ae 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/ResourceUtil.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/ResourceUtil.java @@ -1,7 +1,6 @@ package io.ray.streaming.runtime.util; import com.sun.management.OperatingSystemMXBean; -import io.ray.api.id.UniqueId; import io.ray.streaming.runtime.core.resource.Container; import io.ray.streaming.runtime.core.resource.ContainerId; import java.io.BufferedInputStream; diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/demo/HybridStreamTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/demo/HybridStreamTest.java index 5fe049aff..4db1a1f0f 100644 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/demo/HybridStreamTest.java +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/demo/HybridStreamTest.java @@ -1,11 +1,14 @@ package io.ray.streaming.runtime.demo; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import io.ray.api.Ray; import io.ray.streaming.api.context.StreamingContext; import io.ray.streaming.api.function.impl.FilterFunction; import io.ray.streaming.api.function.impl.MapFunction; import io.ray.streaming.api.function.impl.SinkFunction; import io.ray.streaming.api.stream.DataStreamSource; +import io.ray.streaming.runtime.util.EnvUtil; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -42,6 +45,8 @@ public class HybridStreamTest { @Test(timeOut = 60000) public void testHybridDataStream() throws Exception { Ray.shutdown(); + Preconditions.checkArgument( + EnvUtil.executeCommand(ImmutableList.of("ray", "stop"), 5)); String sinkFileName = "/tmp/testHybridDataStream.txt"; Files.deleteIfExists(Paths.get(sinkFileName)); diff --git a/streaming/python/tests/test_hybrid_stream.py b/streaming/python/tests/test_hybrid_stream.py index cdff228b8..7d79b9a0e 100644 --- a/streaming/python/tests/test_hybrid_stream.py +++ b/streaming/python/tests/test_hybrid_stream.py @@ -1,8 +1,10 @@ import json +import os +import subprocess + import ray from ray.streaming import StreamingContext -import subprocess -import os +from ray.test_utils import wait_for_condition def map_func1(x): @@ -32,9 +34,9 @@ def test_hybrid_stream(): print("java_worker_options", java_worker_options) assert not ray.is_initialized() ray.init( - load_code_from_local=True, - include_java=True, - java_worker_options=java_worker_options, + _load_code_from_local=True, + _include_java=True, + _java_worker_options=java_worker_options, _system_config={"num_workers_per_process_java": 1}) sink_file = "/tmp/ray_streaming_test_hybrid_stream.txt" @@ -45,6 +47,7 @@ def test_hybrid_stream(): print("HybridStreamTest", x) with open(sink_file, "a") as f: f.write(str(x)) + f.flush() ctx = StreamingContext.Builder().build() ctx.from_values("a", "b", "c") \ @@ -54,14 +57,23 @@ def test_hybrid_stream(): .as_python_stream() \ .sink(sink_func) ctx.submit("HybridStreamTest") - import time - time.sleep(3) + + def check_succeed(): + if os.path.exists(sink_file): + import time + time.sleep(3) # Wait all data be written + with open(sink_file, "r") as f: + result = f.read() + assert "a" in result + assert "b" not in result + assert "c" in result + print("Execution succeed") + return True + return False + + wait_for_condition(check_succeed, timeout=60, retry_interval_ms=1000) + print("Execution succeed") ray.shutdown() - with open(sink_file, "r") as f: - result = f.read() - assert "a" in result - assert "b" not in result - assert "c" in result if __name__ == "__main__": diff --git a/streaming/src/queue/transport.cc b/streaming/src/queue/transport.cc index 6bb378e20..5f1c5524b 100644 --- a/streaming/src/queue/transport.cc +++ b/streaming/src/queue/transport.cc @@ -1,6 +1,7 @@ #include "queue/transport.h" #include "queue/utils.h" +#include "ray/streaming/streaming.h" namespace ray { namespace streaming { @@ -8,43 +9,18 @@ namespace streaming { static constexpr int TASK_OPTION_RETURN_NUM_0 = 0; static constexpr int TASK_OPTION_RETURN_NUM_1 = 1; -void Transport::SendInternal(std::shared_ptr buffer, - RayFunction &function, int return_num, - std::vector &return_ids) { - std::unordered_map resources; - std::string name = function.GetFunctionDescriptor()->DefaultTaskName(); - TaskOptions options{name, return_num, resources}; - - char meta_data[3] = {'R', 'A', 'W'}; - std::shared_ptr meta = - std::make_shared((uint8_t *)meta_data, 3, true); - - std::vector> args; - if (function.GetLanguage() == Language::PYTHON) { - auto dummy = "__RAY_DUMMY__"; - std::shared_ptr dummyBuffer = - std::make_shared((uint8_t *)dummy, 13, true); - args.emplace_back(new TaskArgByValue(std::make_shared( - std::move(dummyBuffer), meta, std::vector(), true))); - } - args.emplace_back(new TaskArgByValue(std::make_shared( - std::move(buffer), meta, std::vector(), true))); - - std::vector> results; - CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id_, function, args, - options, &return_ids); -} - void Transport::Send(std::shared_ptr buffer) { STREAMING_LOG(DEBUG) << "Transport::Send buffer size: " << buffer->Size(); std::vector return_ids; - SendInternal(std::move(buffer), async_func_, TASK_OPTION_RETURN_NUM_0, return_ids); + ray::streaming::SendInternal(peer_actor_id_, std::move(buffer), async_func_, + TASK_OPTION_RETURN_NUM_0, return_ids); } std::shared_ptr Transport::SendForResult( std::shared_ptr buffer, int64_t timeout_ms) { std::vector return_ids; - SendInternal(buffer, sync_func_, TASK_OPTION_RETURN_NUM_1, return_ids); + ray::streaming::SendInternal(peer_actor_id_, buffer, sync_func_, + TASK_OPTION_RETURN_NUM_1, return_ids); std::vector> results; Status get_st = diff --git a/streaming/src/queue/transport.h b/streaming/src/queue/transport.h index 111e1bb96..818a9f6b8 100644 --- a/streaming/src/queue/transport.h +++ b/streaming/src/queue/transport.h @@ -47,16 +47,6 @@ class Transport { std::shared_ptr SendForResultWithRetry( std::shared_ptr buffer, int retry_cnt, int64_t timeout_ms); - private: - /// Send buffer internal - /// \param[in] buffer buffer to be sent. - /// \param[in] function the function descriptor of peer's function. - /// \param[in] return_num return value number of the call. - /// \param[out] return_ids return ids from SubmitActorTask. - virtual void SendInternal(std::shared_ptr buffer, - RayFunction &function, int return_num, - std::vector &return_ids); - private: WorkerID worker_id_; ActorID peer_actor_id_;