[Streaming] Fix streaming ci (#10665)

This commit is contained in:
chaokunyang
2020-09-09 16:53:43 +08:00
committed by GitHub
parent 72ac85c19e
commit ccf27a9ad2
16 changed files with 178 additions and 75 deletions
+25
View File
@@ -367,6 +367,29 @@ cc_library(
],
)
# This header is used to warp some streaming code so we can reduce suspicious
# symbols export.
cc_library(
name = "exported_streaming_internal",
srcs = glob(
[
"src/ray/streaming/streaming.cc",
],
),
hdrs = glob(
[
"src/ray/streaming/streaming.h",
],
),
copts = COPTS,
strip_include_prefix = "src",
visibility = ["//visibility:public"],
deps = [
":core_worker_lib",
],
alwayslink = 1,
)
cc_binary(
name = "raylet",
srcs = ["src/ray/raylet/main.cc"],
@@ -1702,6 +1725,7 @@ pyx_library(
),
deps = [
"//:core_worker_lib",
"//:exported_streaming_internal",
"//:global_state_accessor_lib",
"//:ray_util",
"//:raylet_lib",
@@ -1756,6 +1780,7 @@ cc_binary(
visibility = ["//java:__subpackages__"],
deps = [
"//:core_worker_lib",
"//:exported_streaming_internal",
"//:global_state_accessor_lib",
"//:src/ray/ray_exported_symbols.lds",
"//:src/ray/ray_version_script.lds",
+23 -20
View File
@@ -75,7 +75,8 @@ class BaseID {
protected:
BaseID(const std::string &binary) {
RAY_CHECK(binary.size() == Size() || binary.size() == 0)
<< "expected size is " << Size() << ", but got " << binary.size();
<< "expected size is " << Size() << ", but got data " << binary << " of size "
<< binary.size();
std::memcpy(const_cast<uint8_t *>(this->Data()), binary.data(), binary.size());
}
// All IDs are immutable for hash evaluations. MutableData is only allow to use
@@ -341,24 +342,25 @@ std::ostream &operator<<(std::ostream &os, const TaskID &id);
std::ostream &operator<<(std::ostream &os, const ObjectID &id);
std::ostream &operator<<(std::ostream &os, const PlacementGroupID &id);
#define DEFINE_UNIQUE_ID(type) \
class RAY_EXPORT type : public UniqueID { \
public: \
explicit type(const UniqueID &from) { \
std::memcpy(&id_, from.Data(), kUniqueIDSize); \
} \
type() : UniqueID() {} \
static type FromRandom() { return type(UniqueID::FromRandom()); } \
static type FromBinary(const std::string &binary) { return type(binary); } \
static type Nil() { return type(UniqueID::Nil()); } \
static size_t Size() { return kUniqueIDSize; } \
\
private: \
explicit type(const std::string &binary) { \
RAY_CHECK(binary.size() == Size() || binary.size() == 0) \
<< "expected size is " << Size() << ", but got " << binary.size(); \
std::memcpy(&id_, binary.data(), binary.size()); \
} \
#define DEFINE_UNIQUE_ID(type) \
class RAY_EXPORT type : public UniqueID { \
public: \
explicit type(const UniqueID &from) { \
std::memcpy(&id_, from.Data(), kUniqueIDSize); \
} \
type() : UniqueID() {} \
static type FromRandom() { return type(UniqueID::FromRandom()); } \
static type FromBinary(const std::string &binary) { return type(binary); } \
static type Nil() { return type(UniqueID::Nil()); } \
static size_t Size() { return kUniqueIDSize; } \
\
private: \
explicit type(const std::string &binary) { \
RAY_CHECK(binary.size() == Size() || binary.size() == 0) \
<< "expected size is " << Size() << ", but got data " << binary << " of size " \
<< binary.size(); \
std::memcpy(&id_, binary.data(), binary.size()); \
} \
};
#include "ray/common/id_def.h"
@@ -385,7 +387,8 @@ T BaseID<T>::FromRandom() {
template <typename T>
T BaseID<T>::FromBinary(const std::string &binary) {
RAY_CHECK(binary.size() == T::Size() || binary.size() == 0)
<< "expected size is " << T::Size() << ", but got " << binary.size();
<< "expected size is " << T::Size() << ", but got data " << binary << " of size "
<< binary.size();
T t;
std::memcpy(t.MutableData(), binary.data(), binary.size());
return t;
+1 -1
View File
@@ -120,7 +120,7 @@ ObjectID TaskSpecification::ReturnId(size_t return_index) const {
}
bool TaskSpecification::ArgByRef(size_t arg_index) const {
return message_->args(arg_index).object_ref().object_id() != "";
return message_->args(arg_index).has_object_ref();
}
ObjectID TaskSpecification::ArgId(size_t arg_index) const {
+1 -1
View File
@@ -41,7 +41,7 @@ class TaskArgByValue : public TaskArg {
///
/// \param[in] value Value of the argument.
/// \return The task argument.
TaskArgByValue(const std::shared_ptr<RayObject> &value) : value_(value) {
explicit TaskArgByValue(const std::shared_ptr<RayObject> &value) : value_(value) {
RAY_CHECK(value) << "Value can't be null.";
}
@@ -46,7 +46,7 @@ void InlineDependencies(
if (!it->second->IsInPlasmaError()) {
// The object has not been promoted to plasma. Inline the object by
// clearing the reference and replacing it with the raw value.
mutable_arg->mutable_object_ref()->Clear();
mutable_arg->clear_object_ref();
if (it->second->HasData()) {
const auto &data = it->second->GetData();
mutable_arg->set_data(data->Data(), data->Size());
+2
View File
@@ -21,9 +21,11 @@
*ray*TaskID*
*ray*ActorID*
*ray*ObjectID*
*ray*ObjectReference*
# Others
*ray*CoreWorker*
*PyInit*
*init_raylet*
*Java*
*JNI_*
*ray*streaming*
+2
View File
@@ -23,11 +23,13 @@ VERSION_1.0 {
*ray*TaskID*;
*ray*ActorID*;
*ray*ObjectID*;
*ray*ObjectReference*;
# Others
*ray*CoreWorker*;
*PyInit*;
*init_raylet*;
*Java*;
*JNI_*;
*ray*streaming*;
local: *;
};
+34
View File
@@ -0,0 +1,34 @@
#include "ray/streaming/streaming.h"
#include "ray/core_worker/core_worker.h"
namespace ray {
namespace streaming {
void SendInternal(const ActorID &peer_actor_id, std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function, int return_num,
std::vector<ObjectID> &return_ids) {
std::unordered_map<std::string, double> resources;
std::string name = function.GetFunctionDescriptor()->DefaultTaskName();
TaskOptions options{name, return_num, resources};
char meta_data[3] = {'R', 'A', 'W'};
std::shared_ptr<LocalMemoryBuffer> meta =
std::make_shared<LocalMemoryBuffer>((uint8_t *)meta_data, 3, true);
std::vector<std::unique_ptr<TaskArg>> args;
if (function.GetLanguage() == Language::PYTHON) {
auto dummy = "__RAY_DUMMY__";
std::shared_ptr<LocalMemoryBuffer> dummyBuffer =
std::make_shared<LocalMemoryBuffer>((uint8_t *)dummy, 13, true);
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
std::move(dummyBuffer), meta, std::vector<ObjectID>(), true)));
}
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
std::move(buffer), meta, std::vector<ObjectID>(), true)));
std::vector<std::shared_ptr<RayObject>> results;
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id, function, args,
options, &return_ids);
}
} // namespace streaming
} // namespace ray
+21
View File
@@ -0,0 +1,21 @@
#pragma once
#include "ray/common/buffer.h"
#include "ray/common/id.h"
#include "ray/core_worker/common.h"
// This header is used to warp some streaming code so we can reduce suspicious
// symbols export.
namespace ray {
namespace streaming {
/// Send buffer internal
/// \param[in] buffer buffer to be sent.
/// \param[in] function the function descriptor of peer's function.
/// \param[in] return_num return value number of the call.
/// \param[out] return_ids return ids from SubmitActorTask.
void SendInternal(const ActorID &peer_actor_id, std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function, int return_num,
std::vector<ObjectID> &return_ids);
} // namespace streaming
} // namespace ray
+11
View File
@@ -67,6 +67,13 @@ cc_binary(
deps = ["//:core_worker_lib"],
)
cc_binary(
name = "exported_streaming_internal.so",
copts = COPTS,
linkshared = 1,
deps = ["//:exported_streaming_internal"],
)
cc_library(
name = "streaming_util",
srcs = glob([
@@ -143,9 +150,11 @@ cc_library(
"@bazel_tools//src/conditions:windows": [
# TODO(mehrdadn): This is to resolve symbols on Windows for now. Should remove this later. (See d7f8d18.)
"//:core_worker_lib",
"//:exported_streaming_internal",
],
"//conditions:default": [
"core_worker_lib.so",
"exported_streaming_internal.so",
],
}),
)
@@ -233,6 +242,7 @@ cc_library(
)
test_common_deps = [
"//:exported_streaming_internal",
":streaming_lib",
"//:ray_common",
"//:ray_util",
@@ -348,6 +358,7 @@ genrule(
GENERATED_DIR="streaming/python/generated"
mkdir -p "$$GENERATED_DIR"
touch "$$GENERATED_DIR/__init__.py"
# Use this `sed` command to change the import path in the generated file.
sed -i -E 's/from streaming.src.protobuf/from ./' "$$GENERATED_DIR/remote_call_pb2.py"
sed -i -E 's/from protobuf/from ./' "$$GENERATED_DIR/remote_call_pb2.py"
date > $@
@@ -5,6 +5,8 @@ import io.ray.runtime.util.JniUtils;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,4 +39,25 @@ public class EnvUtil {
JniUtils.loadLibrary("streaming_java");
}
/**
* Execute an external command.
*
* @return Whether the command succeeded.
*/
public static boolean executeCommand(List<String> command, int waitTimeoutSeconds) {
try {
ProcessBuilder processBuilder = new ProcessBuilder(command)
.redirectOutput(ProcessBuilder.Redirect.INHERIT)
.redirectError(ProcessBuilder.Redirect.INHERIT);
Process process = processBuilder.start();
boolean exit = process.waitFor(waitTimeoutSeconds, TimeUnit.SECONDS);
if (!exit) {
process.destroyForcibly();
}
return process.exitValue() == 0;
} catch (Exception e) {
throw new RuntimeException("Error executing command " + String.join(" ", command), e);
}
}
}
@@ -1,7 +1,6 @@
package io.ray.streaming.runtime.util;
import com.sun.management.OperatingSystemMXBean;
import io.ray.api.id.UniqueId;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.core.resource.ContainerId;
import java.io.BufferedInputStream;
@@ -1,11 +1,14 @@
package io.ray.streaming.runtime.demo;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.ray.api.Ray;
import io.ray.streaming.api.context.StreamingContext;
import io.ray.streaming.api.function.impl.FilterFunction;
import io.ray.streaming.api.function.impl.MapFunction;
import io.ray.streaming.api.function.impl.SinkFunction;
import io.ray.streaming.api.stream.DataStreamSource;
import io.ray.streaming.runtime.util.EnvUtil;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -42,6 +45,8 @@ public class HybridStreamTest {
@Test(timeOut = 60000)
public void testHybridDataStream() throws Exception {
Ray.shutdown();
Preconditions.checkArgument(
EnvUtil.executeCommand(ImmutableList.of("ray", "stop"), 5));
String sinkFileName = "/tmp/testHybridDataStream.txt";
Files.deleteIfExists(Paths.get(sinkFileName));
+24 -12
View File
@@ -1,8 +1,10 @@
import json
import os
import subprocess
import ray
from ray.streaming import StreamingContext
import subprocess
import os
from ray.test_utils import wait_for_condition
def map_func1(x):
@@ -32,9 +34,9 @@ def test_hybrid_stream():
print("java_worker_options", java_worker_options)
assert not ray.is_initialized()
ray.init(
load_code_from_local=True,
include_java=True,
java_worker_options=java_worker_options,
_load_code_from_local=True,
_include_java=True,
_java_worker_options=java_worker_options,
_system_config={"num_workers_per_process_java": 1})
sink_file = "/tmp/ray_streaming_test_hybrid_stream.txt"
@@ -45,6 +47,7 @@ def test_hybrid_stream():
print("HybridStreamTest", x)
with open(sink_file, "a") as f:
f.write(str(x))
f.flush()
ctx = StreamingContext.Builder().build()
ctx.from_values("a", "b", "c") \
@@ -54,14 +57,23 @@ def test_hybrid_stream():
.as_python_stream() \
.sink(sink_func)
ctx.submit("HybridStreamTest")
import time
time.sleep(3)
def check_succeed():
if os.path.exists(sink_file):
import time
time.sleep(3) # Wait all data be written
with open(sink_file, "r") as f:
result = f.read()
assert "a" in result
assert "b" not in result
assert "c" in result
print("Execution succeed")
return True
return False
wait_for_condition(check_succeed, timeout=60, retry_interval_ms=1000)
print("Execution succeed")
ray.shutdown()
with open(sink_file, "r") as f:
result = f.read()
assert "a" in result
assert "b" not in result
assert "c" in result
if __name__ == "__main__":
+5 -29
View File
@@ -1,6 +1,7 @@
#include "queue/transport.h"
#include "queue/utils.h"
#include "ray/streaming/streaming.h"
namespace ray {
namespace streaming {
@@ -8,43 +9,18 @@ namespace streaming {
static constexpr int TASK_OPTION_RETURN_NUM_0 = 0;
static constexpr int TASK_OPTION_RETURN_NUM_1 = 1;
void Transport::SendInternal(std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function, int return_num,
std::vector<ObjectID> &return_ids) {
std::unordered_map<std::string, double> resources;
std::string name = function.GetFunctionDescriptor()->DefaultTaskName();
TaskOptions options{name, return_num, resources};
char meta_data[3] = {'R', 'A', 'W'};
std::shared_ptr<LocalMemoryBuffer> meta =
std::make_shared<LocalMemoryBuffer>((uint8_t *)meta_data, 3, true);
std::vector<std::unique_ptr<TaskArg>> args;
if (function.GetLanguage() == Language::PYTHON) {
auto dummy = "__RAY_DUMMY__";
std::shared_ptr<LocalMemoryBuffer> dummyBuffer =
std::make_shared<LocalMemoryBuffer>((uint8_t *)dummy, 13, true);
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
std::move(dummyBuffer), meta, std::vector<ObjectID>(), true)));
}
args.emplace_back(new TaskArgByValue(std::make_shared<RayObject>(
std::move(buffer), meta, std::vector<ObjectID>(), true)));
std::vector<std::shared_ptr<RayObject>> results;
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id_, function, args,
options, &return_ids);
}
void Transport::Send(std::shared_ptr<LocalMemoryBuffer> buffer) {
STREAMING_LOG(DEBUG) << "Transport::Send buffer size: " << buffer->Size();
std::vector<ObjectID> return_ids;
SendInternal(std::move(buffer), async_func_, TASK_OPTION_RETURN_NUM_0, return_ids);
ray::streaming::SendInternal(peer_actor_id_, std::move(buffer), async_func_,
TASK_OPTION_RETURN_NUM_0, return_ids);
}
std::shared_ptr<LocalMemoryBuffer> Transport::SendForResult(
std::shared_ptr<LocalMemoryBuffer> buffer, int64_t timeout_ms) {
std::vector<ObjectID> return_ids;
SendInternal(buffer, sync_func_, TASK_OPTION_RETURN_NUM_1, return_ids);
ray::streaming::SendInternal(peer_actor_id_, buffer, sync_func_,
TASK_OPTION_RETURN_NUM_1, return_ids);
std::vector<std::shared_ptr<RayObject>> results;
Status get_st =
-10
View File
@@ -47,16 +47,6 @@ class Transport {
std::shared_ptr<LocalMemoryBuffer> SendForResultWithRetry(
std::shared_ptr<LocalMemoryBuffer> buffer, int retry_cnt, int64_t timeout_ms);
private:
/// Send buffer internal
/// \param[in] buffer buffer to be sent.
/// \param[in] function the function descriptor of peer's function.
/// \param[in] return_num return value number of the call.
/// \param[out] return_ids return ids from SubmitActorTask.
virtual void SendInternal(std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function, int return_num,
std::vector<ObjectID> &return_ids);
private:
WorkerID worker_id_;
ActorID peer_actor_id_;