mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 17:49:47 +08:00
[Streaming] fix streaming ci (#9675)
This commit is contained in:
+25
@@ -367,6 +367,29 @@ cc_library(
|
||||
],
|
||||
)
|
||||
|
||||
# This header is used to warp some streaming code so we can reduce suspicious
|
||||
# symbols export.
|
||||
cc_library(
|
||||
name = "exported_streaming_internal",
|
||||
srcs = glob(
|
||||
[
|
||||
"src/ray/streaming/streaming.cc",
|
||||
],
|
||||
),
|
||||
hdrs = glob(
|
||||
[
|
||||
"src/ray/streaming/streaming.h",
|
||||
],
|
||||
),
|
||||
copts = COPTS,
|
||||
strip_include_prefix = "src",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
":core_worker_lib",
|
||||
],
|
||||
alwayslink = 1,
|
||||
)
|
||||
|
||||
cc_binary(
|
||||
name = "raylet",
|
||||
srcs = ["src/ray/raylet/main.cc"],
|
||||
@@ -1702,6 +1725,7 @@ pyx_library(
|
||||
),
|
||||
deps = [
|
||||
"//:core_worker_lib",
|
||||
"//:exported_streaming_internal",
|
||||
"//:global_state_accessor_lib",
|
||||
"//:ray_util",
|
||||
"//:raylet_lib",
|
||||
@@ -1756,6 +1780,7 @@ cc_binary(
|
||||
visibility = ["//java:__subpackages__"],
|
||||
deps = [
|
||||
"//:core_worker_lib",
|
||||
"//:exported_streaming_internal",
|
||||
"//:global_state_accessor_lib",
|
||||
"//:src/ray/ray_exported_symbols.lds",
|
||||
"//:src/ray/ray_version_script.lds",
|
||||
|
||||
+23
-20
@@ -75,7 +75,8 @@ class BaseID {
|
||||
protected:
|
||||
BaseID(const std::string &binary) {
|
||||
RAY_CHECK(binary.size() == Size() || binary.size() == 0)
|
||||
<< "expected size is " << Size() << ", but got " << binary.size();
|
||||
<< "expected size is " << Size() << ", but got data " << binary << " of size "
|
||||
<< binary.size();
|
||||
std::memcpy(const_cast<uint8_t *>(this->Data()), binary.data(), binary.size());
|
||||
}
|
||||
// All IDs are immutable for hash evaluations. MutableData is only allow to use
|
||||
@@ -341,24 +342,25 @@ std::ostream &operator<<(std::ostream &os, const TaskID &id);
|
||||
std::ostream &operator<<(std::ostream &os, const ObjectID &id);
|
||||
std::ostream &operator<<(std::ostream &os, const PlacementGroupID &id);
|
||||
|
||||
#define DEFINE_UNIQUE_ID(type) \
|
||||
class RAY_EXPORT type : public UniqueID { \
|
||||
public: \
|
||||
explicit type(const UniqueID &from) { \
|
||||
std::memcpy(&id_, from.Data(), kUniqueIDSize); \
|
||||
} \
|
||||
type() : UniqueID() {} \
|
||||
static type FromRandom() { return type(UniqueID::FromRandom()); } \
|
||||
static type FromBinary(const std::string &binary) { return type(binary); } \
|
||||
static type Nil() { return type(UniqueID::Nil()); } \
|
||||
static size_t Size() { return kUniqueIDSize; } \
|
||||
\
|
||||
private: \
|
||||
explicit type(const std::string &binary) { \
|
||||
RAY_CHECK(binary.size() == Size() || binary.size() == 0) \
|
||||
<< "expected size is " << Size() << ", but got " << binary.size(); \
|
||||
std::memcpy(&id_, binary.data(), binary.size()); \
|
||||
} \
|
||||
#define DEFINE_UNIQUE_ID(type) \
|
||||
class RAY_EXPORT type : public UniqueID { \
|
||||
public: \
|
||||
explicit type(const UniqueID &from) { \
|
||||
std::memcpy(&id_, from.Data(), kUniqueIDSize); \
|
||||
} \
|
||||
type() : UniqueID() {} \
|
||||
static type FromRandom() { return type(UniqueID::FromRandom()); } \
|
||||
static type FromBinary(const std::string &binary) { return type(binary); } \
|
||||
static type Nil() { return type(UniqueID::Nil()); } \
|
||||
static size_t Size() { return kUniqueIDSize; } \
|
||||
\
|
||||
private: \
|
||||
explicit type(const std::string &binary) { \
|
||||
RAY_CHECK(binary.size() == Size() || binary.size() == 0) \
|
||||
<< "expected size is " << Size() << ", but got data " << binary << " of size " \
|
||||
<< binary.size(); \
|
||||
std::memcpy(&id_, binary.data(), binary.size()); \
|
||||
} \
|
||||
};
|
||||
|
||||
#include "ray/common/id_def.h"
|
||||
@@ -385,7 +387,8 @@ T BaseID<T>::FromRandom() {
|
||||
template <typename T>
|
||||
T BaseID<T>::FromBinary(const std::string &binary) {
|
||||
RAY_CHECK(binary.size() == T::Size() || binary.size() == 0)
|
||||
<< "expected size is " << T::Size() << ", but got " << binary.size();
|
||||
<< "expected size is " << T::Size() << ", but got data " << binary << " of size "
|
||||
<< binary.size();
|
||||
T t;
|
||||
std::memcpy(t.MutableData(), binary.data(), binary.size());
|
||||
return t;
|
||||
|
||||
@@ -120,7 +120,7 @@ ObjectID TaskSpecification::ReturnId(size_t return_index) const {
|
||||
}
|
||||
|
||||
bool TaskSpecification::ArgByRef(size_t arg_index) const {
|
||||
return message_->args(arg_index).object_ref().object_id() != "";
|
||||
return message_->args(arg_index).has_object_ref();
|
||||
}
|
||||
|
||||
ObjectID TaskSpecification::ArgId(size_t arg_index) const {
|
||||
|
||||
@@ -41,7 +41,7 @@ class TaskArgByValue : public TaskArg {
|
||||
///
|
||||
/// \param[in] value Value of the argument.
|
||||
/// \return The task argument.
|
||||
TaskArgByValue(const std::shared_ptr<RayObject> &value) : value_(value) {
|
||||
explicit TaskArgByValue(const std::shared_ptr<RayObject> &value) : value_(value) {
|
||||
RAY_CHECK(value) << "Value can't be null.";
|
||||
}
|
||||
|
||||
|
||||
@@ -46,7 +46,7 @@ void InlineDependencies(
|
||||
if (!it->second->IsInPlasmaError()) {
|
||||
// The object has not been promoted to plasma. Inline the object by
|
||||
// clearing the reference and replacing it with the raw value.
|
||||
mutable_arg->mutable_object_ref()->Clear();
|
||||
mutable_arg->clear_object_ref();
|
||||
if (it->second->HasData()) {
|
||||
const auto &data = it->second->GetData();
|
||||
mutable_arg->set_data(data->Data(), data->Size());
|
||||
|
||||
@@ -21,9 +21,11 @@
|
||||
*ray*TaskID*
|
||||
*ray*ActorID*
|
||||
*ray*ObjectID*
|
||||
*ray*ObjectReference*
|
||||
# Others
|
||||
*ray*CoreWorker*
|
||||
*PyInit*
|
||||
*init_raylet*
|
||||
*Java*
|
||||
*JNI_*
|
||||
*ray*streaming*
|
||||
|
||||
@@ -23,11 +23,13 @@ VERSION_1.0 {
|
||||
*ray*TaskID*;
|
||||
*ray*ActorID*;
|
||||
*ray*ObjectID*;
|
||||
*ray*ObjectReference*;
|
||||
# Others
|
||||
*ray*CoreWorker*;
|
||||
*PyInit*;
|
||||
*init_raylet*;
|
||||
*Java*;
|
||||
*JNI_*;
|
||||
*ray*streaming*;
|
||||
local: *;
|
||||
};
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
#include "ray/streaming/streaming.h"
|
||||
#include "ray/core_worker/core_worker.h"
|
||||
|
||||
namespace ray {
|
||||
namespace streaming {
|
||||
|
||||
void SendInternal(const ActorID &peer_actor_id, std::shared_ptr<LocalMemoryBuffer> buffer,
|
||||
RayFunction &function, int return_num,
|
||||
std::vector<ObjectID> &return_ids) {
|
||||
std::unordered_map<std::string, double> resources;
|
||||
std::string name = function.GetFunctionDescriptor()->DefaultTaskName();
|
||||
TaskOptions options{name, return_num, resources};
|
||||
|
||||
char meta_data[3] = {'R', 'A', 'W'};
|
||||
std::shared_ptr<LocalMemoryBuffer> meta =
|
||||
std::make_shared<LocalMemoryBuffer>((uint8_t *)meta_data, 3, true);
|
||||
|
||||
std::vector<std::unique_ptr<TaskArg>> args;
|
||||
if (function.GetLanguage() == Language::PYTHON) {
|
||||
auto dummy = "__RAY_DUMMY__";
|
||||
std::shared_ptr<LocalMemoryBuffer> dummyBuffer =
|
||||
std::make_shared<LocalMemoryBuffer>((uint8_t *)dummy, 13, true);
|
||||
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
|
||||
std::move(dummyBuffer), meta, std::vector<ObjectID>(), true)));
|
||||
}
|
||||
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
|
||||
std::move(buffer), meta, std::vector<ObjectID>(), true)));
|
||||
|
||||
std::vector<std::shared_ptr<RayObject>> results;
|
||||
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id, function, args,
|
||||
options, &return_ids);
|
||||
}
|
||||
} // namespace streaming
|
||||
} // namespace ray
|
||||
@@ -0,0 +1,21 @@
|
||||
#pragma once
|
||||
#include "ray/common/buffer.h"
|
||||
#include "ray/common/id.h"
|
||||
#include "ray/core_worker/common.h"
|
||||
|
||||
// This header is used to warp some streaming code so we can reduce suspicious
|
||||
// symbols export.
|
||||
namespace ray {
|
||||
namespace streaming {
|
||||
|
||||
/// Send buffer internal
|
||||
/// \param[in] buffer buffer to be sent.
|
||||
/// \param[in] function the function descriptor of peer's function.
|
||||
/// \param[in] return_num return value number of the call.
|
||||
/// \param[out] return_ids return ids from SubmitActorTask.
|
||||
void SendInternal(const ActorID &peer_actor_id, std::shared_ptr<LocalMemoryBuffer> buffer,
|
||||
RayFunction &function, int return_num,
|
||||
std::vector<ObjectID> &return_ids);
|
||||
|
||||
} // namespace streaming
|
||||
} // namespace ray
|
||||
@@ -67,6 +67,13 @@ cc_binary(
|
||||
deps = ["//:core_worker_lib"],
|
||||
)
|
||||
|
||||
cc_binary(
|
||||
name = "exported_streaming_internal.so",
|
||||
copts = COPTS,
|
||||
linkshared = 1,
|
||||
deps = ["//:exported_streaming_internal"],
|
||||
)
|
||||
|
||||
cc_library(
|
||||
name = "streaming_util",
|
||||
srcs = glob([
|
||||
@@ -146,6 +153,7 @@ cc_library(
|
||||
],
|
||||
"//conditions:default": [
|
||||
"core_worker_lib.so",
|
||||
"exported_streaming_internal.so",
|
||||
],
|
||||
}),
|
||||
)
|
||||
@@ -233,6 +241,7 @@ cc_library(
|
||||
)
|
||||
|
||||
test_common_deps = [
|
||||
"//:exported_streaming_internal",
|
||||
":streaming_lib",
|
||||
"//:ray_common",
|
||||
"//:ray_util",
|
||||
@@ -348,6 +357,7 @@ genrule(
|
||||
GENERATED_DIR="streaming/python/generated"
|
||||
mkdir -p "$$GENERATED_DIR"
|
||||
touch "$$GENERATED_DIR/__init__.py"
|
||||
# Use this `sed` command to change the import path in the generated file.
|
||||
sed -i -E 's/from streaming.src.protobuf/from ./' "$$GENERATED_DIR/remote_call_pb2.py"
|
||||
sed -i -E 's/from protobuf/from ./' "$$GENERATED_DIR/remote_call_pb2.py"
|
||||
date > $@
|
||||
|
||||
+23
@@ -5,6 +5,8 @@ import io.ray.runtime.util.JniUtils;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -37,4 +39,25 @@ public class EnvUtil {
|
||||
JniUtils.loadLibrary("streaming_java");
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute an external command.
|
||||
*
|
||||
* @return Whether the command succeeded.
|
||||
*/
|
||||
public static boolean executeCommand(List<String> command, int waitTimeoutSeconds) {
|
||||
try {
|
||||
ProcessBuilder processBuilder = new ProcessBuilder(command)
|
||||
.redirectOutput(ProcessBuilder.Redirect.INHERIT)
|
||||
.redirectError(ProcessBuilder.Redirect.INHERIT);
|
||||
Process process = processBuilder.start();
|
||||
boolean exit = process.waitFor(waitTimeoutSeconds, TimeUnit.SECONDS);
|
||||
if (!exit) {
|
||||
process.destroyForcibly();
|
||||
}
|
||||
return process.exitValue() == 0;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Error executing command " + String.join(" ", command), e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
-1
@@ -1,7 +1,6 @@
|
||||
package io.ray.streaming.runtime.util;
|
||||
|
||||
import com.sun.management.OperatingSystemMXBean;
|
||||
import io.ray.api.id.UniqueId;
|
||||
import io.ray.streaming.runtime.core.resource.Container;
|
||||
import io.ray.streaming.runtime.core.resource.ContainerId;
|
||||
import java.io.BufferedInputStream;
|
||||
|
||||
+5
@@ -1,11 +1,14 @@
|
||||
package io.ray.streaming.runtime.demo;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.ray.api.Ray;
|
||||
import io.ray.streaming.api.context.StreamingContext;
|
||||
import io.ray.streaming.api.function.impl.FilterFunction;
|
||||
import io.ray.streaming.api.function.impl.MapFunction;
|
||||
import io.ray.streaming.api.function.impl.SinkFunction;
|
||||
import io.ray.streaming.api.stream.DataStreamSource;
|
||||
import io.ray.streaming.runtime.util.EnvUtil;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
@@ -42,6 +45,8 @@ public class HybridStreamTest {
|
||||
@Test(timeOut = 60000)
|
||||
public void testHybridDataStream() throws Exception {
|
||||
Ray.shutdown();
|
||||
Preconditions.checkArgument(
|
||||
EnvUtil.executeCommand(ImmutableList.of("ray", "stop"), 5));
|
||||
String sinkFileName = "/tmp/testHybridDataStream.txt";
|
||||
Files.deleteIfExists(Paths.get(sinkFileName));
|
||||
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
import ray
|
||||
from ray.streaming import StreamingContext
|
||||
import subprocess
|
||||
import os
|
||||
from ray.test_utils import wait_for_condition
|
||||
|
||||
|
||||
def map_func1(x):
|
||||
@@ -32,9 +34,9 @@ def test_hybrid_stream():
|
||||
print("java_worker_options", java_worker_options)
|
||||
assert not ray.is_initialized()
|
||||
ray.init(
|
||||
load_code_from_local=True,
|
||||
include_java=True,
|
||||
java_worker_options=java_worker_options,
|
||||
_load_code_from_local=True,
|
||||
_include_java=True,
|
||||
_java_worker_options=java_worker_options,
|
||||
_system_config={"num_workers_per_process_java": 1})
|
||||
|
||||
sink_file = "/tmp/ray_streaming_test_hybrid_stream.txt"
|
||||
@@ -45,6 +47,7 @@ def test_hybrid_stream():
|
||||
print("HybridStreamTest", x)
|
||||
with open(sink_file, "a") as f:
|
||||
f.write(str(x))
|
||||
f.flush()
|
||||
|
||||
ctx = StreamingContext.Builder().build()
|
||||
ctx.from_values("a", "b", "c") \
|
||||
@@ -54,14 +57,23 @@ def test_hybrid_stream():
|
||||
.as_python_stream() \
|
||||
.sink(sink_func)
|
||||
ctx.submit("HybridStreamTest")
|
||||
import time
|
||||
time.sleep(3)
|
||||
|
||||
def check_succeed():
|
||||
if os.path.exists(sink_file):
|
||||
import time
|
||||
time.sleep(3) # Wait all data be written
|
||||
with open(sink_file, "r") as f:
|
||||
result = f.read()
|
||||
assert "a" in result
|
||||
assert "b" not in result
|
||||
assert "c" in result
|
||||
print("Execution succeed")
|
||||
return True
|
||||
return False
|
||||
|
||||
wait_for_condition(check_succeed, timeout=60, retry_interval_ms=1000)
|
||||
print("Execution succeed")
|
||||
ray.shutdown()
|
||||
with open(sink_file, "r") as f:
|
||||
result = f.read()
|
||||
assert "a" in result
|
||||
assert "b" not in result
|
||||
assert "c" in result
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#include "queue/transport.h"
|
||||
|
||||
#include "queue/utils.h"
|
||||
#include "ray/streaming/streaming.h"
|
||||
|
||||
namespace ray {
|
||||
namespace streaming {
|
||||
@@ -8,43 +9,18 @@ namespace streaming {
|
||||
static constexpr int TASK_OPTION_RETURN_NUM_0 = 0;
|
||||
static constexpr int TASK_OPTION_RETURN_NUM_1 = 1;
|
||||
|
||||
void Transport::SendInternal(std::shared_ptr<LocalMemoryBuffer> buffer,
|
||||
RayFunction &function, int return_num,
|
||||
std::vector<ObjectID> &return_ids) {
|
||||
std::unordered_map<std::string, double> resources;
|
||||
std::string name = function.GetFunctionDescriptor()->DefaultTaskName();
|
||||
TaskOptions options{name, return_num, resources};
|
||||
|
||||
char meta_data[3] = {'R', 'A', 'W'};
|
||||
std::shared_ptr<LocalMemoryBuffer> meta =
|
||||
std::make_shared<LocalMemoryBuffer>((uint8_t *)meta_data, 3, true);
|
||||
|
||||
std::vector<std::unique_ptr<TaskArg>> args;
|
||||
if (function.GetLanguage() == Language::PYTHON) {
|
||||
auto dummy = "__RAY_DUMMY__";
|
||||
std::shared_ptr<LocalMemoryBuffer> dummyBuffer =
|
||||
std::make_shared<LocalMemoryBuffer>((uint8_t *)dummy, 13, true);
|
||||
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
|
||||
std::move(dummyBuffer), meta, std::vector<ObjectID>(), true)));
|
||||
}
|
||||
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
|
||||
std::move(buffer), meta, std::vector<ObjectID>(), true)));
|
||||
|
||||
std::vector<std::shared_ptr<RayObject>> results;
|
||||
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id_, function, args,
|
||||
options, &return_ids);
|
||||
}
|
||||
|
||||
void Transport::Send(std::shared_ptr<LocalMemoryBuffer> buffer) {
|
||||
STREAMING_LOG(DEBUG) << "Transport::Send buffer size: " << buffer->Size();
|
||||
std::vector<ObjectID> return_ids;
|
||||
SendInternal(std::move(buffer), async_func_, TASK_OPTION_RETURN_NUM_0, return_ids);
|
||||
ray::streaming::SendInternal(peer_actor_id_, std::move(buffer), async_func_,
|
||||
TASK_OPTION_RETURN_NUM_0, return_ids);
|
||||
}
|
||||
|
||||
std::shared_ptr<LocalMemoryBuffer> Transport::SendForResult(
|
||||
std::shared_ptr<LocalMemoryBuffer> buffer, int64_t timeout_ms) {
|
||||
std::vector<ObjectID> return_ids;
|
||||
SendInternal(buffer, sync_func_, TASK_OPTION_RETURN_NUM_1, return_ids);
|
||||
ray::streaming::SendInternal(peer_actor_id_, buffer, sync_func_,
|
||||
TASK_OPTION_RETURN_NUM_1, return_ids);
|
||||
|
||||
std::vector<std::shared_ptr<RayObject>> results;
|
||||
Status get_st =
|
||||
|
||||
@@ -47,16 +47,6 @@ class Transport {
|
||||
std::shared_ptr<LocalMemoryBuffer> SendForResultWithRetry(
|
||||
std::shared_ptr<LocalMemoryBuffer> buffer, int retry_cnt, int64_t timeout_ms);
|
||||
|
||||
private:
|
||||
/// Send buffer internal
|
||||
/// \param[in] buffer buffer to be sent.
|
||||
/// \param[in] function the function descriptor of peer's function.
|
||||
/// \param[in] return_num return value number of the call.
|
||||
/// \param[out] return_ids return ids from SubmitActorTask.
|
||||
virtual void SendInternal(std::shared_ptr<LocalMemoryBuffer> buffer,
|
||||
RayFunction &function, int return_num,
|
||||
std::vector<ObjectID> &return_ids);
|
||||
|
||||
private:
|
||||
WorkerID worker_id_;
|
||||
ActorID peer_actor_id_;
|
||||
|
||||
Reference in New Issue
Block a user