This commit is contained in:
Ian Rodney
2020-08-25 10:07:20 -07:00
committed by GitHub
parent 2689fb439c
commit b14c56e599
11 changed files with 141 additions and 100 deletions
+3 -1
View File
@@ -195,7 +195,9 @@ class MockConsumer : public ConsumerChannel {
explicit MockConsumer(std::shared_ptr<Config> &transfer_config,
ConsumerChannelInfo &c_channel_info)
: ConsumerChannel(transfer_config, c_channel_info){};
TransferCreationStatus CreateTransferChannel() override { return TransferCreationStatus::PullOk; }
TransferCreationStatus CreateTransferChannel() override {
return TransferCreationStatus::PullOk;
}
StreamingStatus DestroyTransferChannel() override { return StreamingStatus::OK; }
StreamingStatus ClearTransferCheckpoint(uint64_t checkpoint_id,
uint64_t checkpoint_offset) override {
+8 -9
View File
@@ -12,8 +12,7 @@ std::unique_ptr<LocalMemoryBuffer> Message::ToBytes() {
int64_t fbs_length = pboutput.length();
queue::protobuf::StreamingQueueMessageType type = Type();
size_t total_len =
kItemHeaderSize + fbs_length;
size_t total_len = kItemHeaderSize + fbs_length;
if (buffer_ != nullptr) {
total_len += buffer_->Size();
}
@@ -45,7 +44,6 @@ std::unique_ptr<LocalMemoryBuffer> Message::ToBytes() {
return buffer;
}
void Message::FillMessageCommon(queue::protobuf::MessageCommon *common) {
common->set_src_actor_id(actor_id_.Binary());
common->set_dst_actor_id(peer_actor_id_.Binary());
@@ -82,8 +80,9 @@ std::shared_ptr<DataMessage> DataMessage::FromBytes(uint8_t *bytes) {
/// Copy data and create a new buffer for streaming queue.
std::shared_ptr<LocalMemoryBuffer> buffer =
std::make_shared<LocalMemoryBuffer>(bytes, (size_t)length, true);
std::shared_ptr<DataMessage> data_msg = std::make_shared<DataMessage>(
src_actor_id, dst_actor_id, queue_id, seq_id, msg_id_start, msg_id_end, buffer, raw);
std::shared_ptr<DataMessage> data_msg =
std::make_shared<DataMessage>(src_actor_id, dst_actor_id, queue_id, seq_id,
msg_id_start, msg_id_end, buffer, raw);
return data_msg;
}
@@ -209,10 +208,10 @@ std::shared_ptr<PullResponseMessage> PullResponseMessage::FromBytes(uint8_t *byt
bool is_upstream_first_pull = message.is_upstream_first_pull();
STREAMING_LOG(INFO) << "src_actor_id:" << src_actor_id
<< " dst_actor_id:" << dst_actor_id << " queue_id:" << queue_id
<< " seq_id: " << seq_id << " msg_id: " << msg_id << " err_code:"
<< queue::protobuf::StreamingQueueError_Name(err_code)
<< " is_upstream_first_pull: " << is_upstream_first_pull;
<< " dst_actor_id:" << dst_actor_id << " queue_id:" << queue_id
<< " seq_id: " << seq_id << " msg_id: " << msg_id << " err_code:"
<< queue::protobuf::StreamingQueueError_Name(err_code)
<< " is_upstream_first_pull: " << is_upstream_first_pull;
std::shared_ptr<PullResponseMessage> pull_rsp_msg =
std::make_shared<PullResponseMessage>(src_actor_id, dst_actor_id, queue_id, seq_id,
+7 -3
View File
@@ -45,6 +45,7 @@ class Message {
virtual void ToProtobuf(std::string *output) = 0;
void FillMessageCommon(queue::protobuf::MessageCommon *common);
protected:
ActorID actor_id_;
ActorID peer_actor_id_;
@@ -68,10 +69,13 @@ constexpr uint32_t kItemHeaderSize = kItemMetaHeaderSize + sizeof(uint64_t);
class DataMessage : public Message {
public:
DataMessage(const ActorID &actor_id, const ActorID &peer_actor_id, ObjectID queue_id,
uint64_t seq_id, uint64_t msg_id_start, uint64_t msg_id_end, std::shared_ptr<LocalMemoryBuffer> buffer, bool raw)
: Message(actor_id, peer_actor_id, queue_id, buffer), seq_id_(seq_id),
uint64_t seq_id, uint64_t msg_id_start, uint64_t msg_id_end,
std::shared_ptr<LocalMemoryBuffer> buffer, bool raw)
: Message(actor_id, peer_actor_id, queue_id, buffer),
seq_id_(seq_id),
msg_id_start_(msg_id_start),
msg_id_end_(msg_id_end),raw_(raw) {}
msg_id_end_(msg_id_end),
raw_(raw) {}
virtual ~DataMessage() {}
static std::shared_ptr<DataMessage> FromBytes(uint8_t *bytes);
+13 -9
View File
@@ -102,7 +102,8 @@ size_t Queue::PendingCount() {
}
Status WriterQueue::Push(uint64_t seq_id, uint8_t *buffer, uint32_t buffer_size,
uint64_t timestamp, uint64_t msg_id_start, uint64_t msg_id_end, bool raw) {
uint64_t timestamp, uint64_t msg_id_start, uint64_t msg_id_end,
bool raw) {
if (IsPendingFull(buffer_size)) {
return Status::OutOfMemory("Queue Push OutOfMemory");
}
@@ -122,8 +123,8 @@ Status WriterQueue::Push(uint64_t seq_id, uint8_t *buffer, uint32_t buffer_size,
void WriterQueue::Send() {
while (!IsPendingEmpty()) {
QueueItem item = PopPending();
DataMessage msg(actor_id_, peer_actor_id_, queue_id_, item.SeqId(), item.MsgIdStart(), item.MsgIdEnd(), item.Buffer(),
item.IsRaw());
DataMessage msg(actor_id_, peer_actor_id_, queue_id_, item.SeqId(), item.MsgIdStart(),
item.MsgIdEnd(), item.Buffer(), item.IsRaw());
std::unique_ptr<LocalMemoryBuffer> buffer = msg.ToBytes();
STREAMING_CHECK(transport_ != nullptr);
transport_->Send(std::move(buffer));
@@ -191,8 +192,10 @@ int WriterQueue::ResendItems(std::list<QueueItem>::iterator start_iter,
}
void WriterQueue::FindItem(
uint64_t target_msg_id, std::function<void()> greater_callback, std::function<void()> less_callback,
std::function<void(std::list<QueueItem>::iterator, uint64_t, uint64_t)> equal_callback) {
uint64_t target_msg_id, std::function<void()> greater_callback,
std::function<void()> less_callback,
std::function<void(std::list<QueueItem>::iterator, uint64_t, uint64_t)>
equal_callback) {
auto last_one = std::prev(watershed_iter_);
bool last_item_too_small =
last_one != buffer_queue_.end() && last_one->MsgIdEnd() < target_msg_id;
@@ -243,10 +246,11 @@ void WriterQueue::OnPull(
/// target_msg_id is too small.
[this, &pull_msg, &callback]() {
STREAMING_LOG(WARNING) << "Data lost.";
PullResponseMessage msg(
pull_msg->PeerActorId(), pull_msg->ActorId(), pull_msg->QueueId(),
QUEUE_INVALID_SEQ_ID, QUEUE_INVALID_SEQ_ID,
queue::protobuf::StreamingQueueError::DATA_LOST, is_upstream_first_pull_);
PullResponseMessage msg(pull_msg->PeerActorId(), pull_msg->ActorId(),
pull_msg->QueueId(), QUEUE_INVALID_SEQ_ID,
QUEUE_INVALID_SEQ_ID,
queue::protobuf::StreamingQueueError::DATA_LOST,
is_upstream_first_pull_);
std::unique_ptr<LocalMemoryBuffer> buffer = msg.ToBytes();
callback(std::move(buffer));
+21 -17
View File
@@ -31,14 +31,18 @@ class Queue {
/// \param actor_id, the actor id of upstream worker
/// \param peer_actor_id, the actor id of downstream worker
/// \param queue_id the unique identification of a pair of queues (upstream and
/// downstream).
/// \param size max size of the queue in bytes.
/// downstream).
/// \param size max size of the queue in bytes.
/// \param transport
/// transport to send items to peer.
Queue(const ActorID &actor_id, const ActorID &peer_actor_id, ObjectID queue_id, uint64_t size, std::shared_ptr<Transport> transport)
Queue(const ActorID &actor_id, const ActorID &peer_actor_id, ObjectID queue_id,
uint64_t size, std::shared_ptr<Transport> transport)
: actor_id_(actor_id),
peer_actor_id_(peer_actor_id),
queue_id_(queue_id), max_data_size_(size), data_size_(0), data_size_sent_(0) {
queue_id_(queue_id),
max_data_size_(size),
data_size_(0),
data_size_sent_(0) {
buffer_queue_.push_back(InvalidQueueItem());
watershed_iter_ = buffer_queue_.begin();
}
@@ -98,6 +102,7 @@ class Queue {
inline ActorID GetActorID() { return actor_id_; }
inline ActorID GetPeerActorID() { return peer_actor_id_; }
inline ObjectID GetQueueID() { return queue_id_; }
protected:
std::list<QueueItem> buffer_queue_;
std::list<QueueItem>::iterator watershed_iter_;
@@ -137,13 +142,12 @@ class WriterQueue : public Queue {
is_resending_(false),
is_upstream_first_pull_(true) {}
/// Push a continuous buffer into queue, the buffer consists of some messages packed by DataWriter.
/// \param data, the buffer address
/// \param data_size, buffer size
/// \param timestamp, the timestamp when the buffer pushed in
/// \param msg_id_start, the message id of the first message in the buffer
/// \param msg_id_end, the message id of the last message in the buffer
/// \param raw, whether this buffer is raw data, be True only in test
/// Push a continuous buffer into queue, the buffer consists of some messages packed by
/// DataWriter. \param data, the buffer address \param data_size, buffer size \param
/// timestamp, the timestamp when the buffer pushed in \param msg_id_start, the message
/// id of the first message in the buffer \param msg_id_end, the message id of the last
/// message in the buffer \param raw, whether this buffer is raw data, be True only in
/// test
Status Push(uint64_t seq_id, uint8_t *buffer, uint32_t buffer_size, uint64_t timestamp,
uint64_t msg_id_start, uint64_t msg_id_end, bool raw = false);
@@ -201,11 +205,10 @@ class WriterQueue : public Queue {
/// in the queue, the `less_callback` callback will be called; If the `target_msg_id` is
/// found in the queue, the `found_callback` callback willbe called.
/// \param target_msg_id, the target message id to be found.
void FindItem(
uint64_t target_msg_id,
std::function<void()> greater_callback,
std::function<void()> less_callback,
std::function<void(std::list<QueueItem>::iterator, uint64_t, uint64_t)> equal_callback);
void FindItem(uint64_t target_msg_id, std::function<void()> greater_callback,
std::function<void()> less_callback,
std::function<void(std::list<QueueItem>::iterator, uint64_t, uint64_t)>
equal_callback);
private:
ActorID actor_id_;
@@ -231,7 +234,8 @@ class ReaderQueue : public Queue {
/// NOTE: we do not restrict queue size of ReaderQueue
ReaderQueue(const ObjectID &queue_id, const ActorID &actor_id,
const ActorID &peer_actor_id, std::shared_ptr<Transport> transport)
: Queue(actor_id, peer_actor_id, queue_id, std::numeric_limits<uint64_t>::max(), transport),
: Queue(actor_id, peer_actor_id, queue_id, std::numeric_limits<uint64_t>::max(),
transport),
actor_id_(actor_id),
peer_actor_id_(peer_actor_id),
min_consumed_id_(QUEUE_INVALID_SEQ_ID),
+10 -12
View File
@@ -238,8 +238,8 @@ void UpstreamQueueMessageHandler::DispatchMessageInternal(
} else if (msg->Type() ==
queue::protobuf::StreamingQueueMessageType::StreamingQueueCheckRspMsgType) {
STREAMING_CHECK(false) << "Should not receive StreamingQueueCheckRspMsg";
} else if (msg->Type() ==
queue::protobuf::StreamingQueueMessageType::StreamingQueuePullRequestMsgType) {
} else if (msg->Type() == queue::protobuf::StreamingQueueMessageType::
StreamingQueuePullRequestMsgType) {
STREAMING_CHECK(callback) << "StreamingQueuePullRequestMsg "
<< " qid: " << msg->QueueId() << " actorid "
<< msg->ActorId()
@@ -330,8 +330,7 @@ std::shared_ptr<ReaderQueue> DownstreamQueueMessageHandler::CreateDownstreamQueu
StreamingQueueStatus DownstreamQueueMessageHandler::PullQueue(
const ObjectID &queue_id, uint64_t start_msg_id, bool &is_upstream_first_pull,
uint64_t timeout_ms) {
STREAMING_LOG(INFO) << "PullQueue queue_id: "
<< queue_id
STREAMING_LOG(INFO) << "PullQueue queue_id: " << queue_id
<< " start_msg_id: " << start_msg_id
<< " is_upstream_first_pull: " << is_upstream_first_pull;
uint64_t start_time = current_time_ms();
@@ -397,8 +396,8 @@ void DownstreamQueueMessageHandler::DispatchMessageInternal(
if (callback != nullptr) {
callback(check_result);
}
} else if (msg->Type() ==
queue::protobuf::StreamingQueueMessageType::StreamingQueueResendDataMsgType) {
} else if (msg->Type() == queue::protobuf::StreamingQueueMessageType::
StreamingQueueResendDataMsgType) {
auto queue = downstream_queues_.find(msg->QueueId());
if (queue == downstream_queues_.end()) {
std::shared_ptr<ResendDataMessage> data_msg =
@@ -456,14 +455,14 @@ StreamingQueueStatus DownstreamQueueMessageHandler::PullPeerAsync(
}
std::shared_ptr<Message> result_msg = ParseMessage(result_buffer);
STREAMING_CHECK(result_msg->Type() ==
queue::protobuf::StreamingQueueMessageType::StreamingQueuePullResponseMsgType);
STREAMING_CHECK(
result_msg->Type() ==
queue::protobuf::StreamingQueueMessageType::StreamingQueuePullResponseMsgType);
std::shared_ptr<PullResponseMessage> response_msg =
std::dynamic_pointer_cast<PullResponseMessage>(result_msg);
STREAMING_LOG(INFO) << "PullPeerAsync error: "
<< queue::protobuf::StreamingQueueError_Name(
response_msg->Error())
<< queue::protobuf::StreamingQueueError_Name(response_msg->Error())
<< " start_msg_id: " << start_msg_id;
is_upstream_first_pull = response_msg->IsUpstreamFirstPull();
@@ -471,8 +470,7 @@ StreamingQueueStatus DownstreamQueueMessageHandler::PullPeerAsync(
STREAMING_LOG(INFO) << "Set queue " << queue_id << " expect_seq_id to "
<< response_msg->SeqId();
return StreamingQueueStatus::OK;
} else if (response_msg->Error() ==
queue::protobuf::StreamingQueueError::DATA_LOST) {
} else if (response_msg->Error() == queue::protobuf::StreamingQueueError::DATA_LOST) {
return StreamingQueueStatus::DataLost;
} else if (response_msg->Error() ==
queue::protobuf::StreamingQueueError::NO_VALID_DATA) {
+9 -9
View File
@@ -40,8 +40,7 @@ class QueueMessageHandler {
/// Construct a QueueMessageHandler instance.
/// \param[in] actor_id actor id of current actor.
QueueMessageHandler(const ActorID &actor_id)
: actor_id_(actor_id), queue_dummy_work_(queue_service_) {
}
: actor_id_(actor_id), queue_dummy_work_(queue_service_) {}
virtual ~QueueMessageHandler() { Stop(); }
@@ -116,8 +115,8 @@ class QueueMessageHandler {
class UpstreamQueueMessageHandler : public QueueMessageHandler {
public:
/// Construct a UpstreamQueueMessageHandler instance.
UpstreamQueueMessageHandler(const ActorID &actor_id) : QueueMessageHandler(actor_id),
handler_service_dummy_worker_(handler_service_) {
UpstreamQueueMessageHandler(const ActorID &actor_id)
: QueueMessageHandler(actor_id), handler_service_dummy_worker_(handler_service_) {
Start();
}
/// Create a upstream queue.
@@ -153,6 +152,7 @@ class UpstreamQueueMessageHandler : public QueueMessageHandler {
const ActorID &actor_id);
static std::shared_ptr<UpstreamQueueMessageHandler> GetService();
virtual void Start() override;
private:
bool CheckQueueSync(const ObjectID &queue_ids);
virtual void Stop() override;
@@ -165,13 +165,13 @@ class UpstreamQueueMessageHandler : public QueueMessageHandler {
std::thread handle_service_thread_;
};
/// DownstreamQueueMessageHandler holds and manages all downstream queues of current actor.
/// DownstreamQueueMessageHandler holds and manages all downstream queues of current
/// actor.
class DownstreamQueueMessageHandler : public QueueMessageHandler {
public:
DownstreamQueueMessageHandler(const ActorID &actor_id)
: QueueMessageHandler(actor_id) {
Start();
}
DownstreamQueueMessageHandler(const ActorID &actor_id) : QueueMessageHandler(actor_id) {
Start();
}
/// Create a downstream queue.
/// \param queue_id, queue id of the queue to be created.
/// \param peer_actor_id, actor id of peer actor.
+13 -4
View File
@@ -33,14 +33,22 @@ class QueueItem {
/// \param[in] raw whether the data content is raw bytes, only used in some tests.
QueueItem(uint64_t seq_id, uint8_t *data, uint32_t data_size, uint64_t timestamp,
uint64_t msg_id_start, uint64_t msg_id_end, bool raw = false)
: seq_id_(seq_id), msg_id_start_(msg_id_start), msg_id_end_(msg_id_end),
: seq_id_(seq_id),
msg_id_start_(msg_id_start),
msg_id_end_(msg_id_end),
timestamp_(timestamp),
raw_(raw),
/*COPY*/ buffer_(std::make_shared<LocalMemoryBuffer>(data, data_size, true)) {}
QueueItem(uint64_t seq_id, std::shared_ptr<LocalMemoryBuffer> buffer,
uint64_t timestamp, uint64_t msg_id_start, uint64_t msg_id_end, bool raw = false)
: seq_id_(seq_id), msg_id_start_(msg_id_start), msg_id_end_(msg_id_end), timestamp_(timestamp), raw_(raw), buffer_(buffer) {}
uint64_t timestamp, uint64_t msg_id_start, uint64_t msg_id_end,
bool raw = false)
: seq_id_(seq_id),
msg_id_start_(msg_id_start),
msg_id_end_(msg_id_end),
timestamp_(timestamp),
raw_(raw),
buffer_(buffer) {}
QueueItem(std::shared_ptr<DataMessage> data_msg)
: seq_id_(data_msg->SeqId()),
@@ -112,7 +120,8 @@ class QueueItem {
class InvalidQueueItem : public QueueItem {
public:
InvalidQueueItem() : QueueItem(QUEUE_INVALID_SEQ_ID, data_, 1, 0, QUEUE_INVALID_SEQ_ID,
InvalidQueueItem()
: QueueItem(QUEUE_INVALID_SEQ_ID, data_, 1, 0, QUEUE_INVALID_SEQ_ID,
QUEUE_INVALID_SEQ_ID) {}
private:
+53 -32
View File
@@ -260,12 +260,12 @@ class StreamingQueueReaderTestSuite : public StreamingQueueTestSuite {
class StreamingQueueUpStreamTestSuite : public StreamingQueueTestSuite {
public:
StreamingQueueUpStreamTestSuite(ActorID &peer_actor_id, std::vector<ObjectID> queue_ids,
std::vector<ObjectID> rescale_queue_ids)
std::vector<ObjectID> rescale_queue_ids)
: StreamingQueueTestSuite(peer_actor_id, queue_ids, rescale_queue_ids) {
test_func_map_ = {
{"pull_peer_async_test",
std::bind(&StreamingQueueUpStreamTestSuite::PullPeerAsyncTest, this)},
{"get_queue_test",
{"get_queue_test",
std::bind(&StreamingQueueUpStreamTestSuite::GetQueueTest, this)}};
}
@@ -274,11 +274,16 @@ class StreamingQueueUpStreamTestSuite : public StreamingQueueTestSuite {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
auto upstream_handler = ray::streaming::UpstreamQueueMessageHandler::GetService();
ObjectID &queue_id = queue_ids_[0];
RayFunction async_call_func{ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "reader_async_call_func", ""})};
RayFunction sync_call_func{ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "reader_sync_call_func", ""})};
upstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func, sync_call_func);
RayFunction async_call_func{
ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(
ray::Language::PYTHON, {"", "", "reader_async_call_func", ""})};
RayFunction sync_call_func{
ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(
ray::Language::PYTHON, {"", "", "reader_sync_call_func", ""})};
upstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func,
sync_call_func);
upstream_handler->CreateUpstreamQueue(queue_id, peer_actor_id_, 10240);
STREAMING_LOG(INFO) << "IsQueueExist: "
<< upstream_handler->UpstreamQueueExists(queue_id);
@@ -296,11 +301,16 @@ class StreamingQueueUpStreamTestSuite : public StreamingQueueTestSuite {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
auto upstream_handler = ray::streaming::UpstreamQueueMessageHandler::GetService();
ObjectID &queue_id = queue_ids_[0];
RayFunction async_call_func{ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "reader_async_call_func", ""})};
RayFunction sync_call_func{ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "reader_sync_call_func", ""})};
upstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func, sync_call_func);
RayFunction async_call_func{
ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(
ray::Language::PYTHON, {"", "", "reader_async_call_func", ""})};
RayFunction sync_call_func{
ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(
ray::Language::PYTHON, {"", "", "reader_sync_call_func", ""})};
upstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func,
sync_call_func);
std::shared_ptr<WriterQueue> queue =
upstream_handler->CreateUpstreamQueue(queue_id, peer_actor_id_, 10240);
STREAMING_LOG(INFO) << "IsQueueExist: "
@@ -313,8 +323,10 @@ class StreamingQueueUpStreamTestSuite : public StreamingQueueTestSuite {
uint8_t data[100];
memset(data, msg_id, 100);
STREAMING_LOG(INFO) << "Writer User Push item msg_id: " << msg_id;
ASSERT_TRUE(
queue->Push(msg_id/*seqid*/, data, 100, current_sys_time_ms(), msg_id, msg_id, true).ok());
ASSERT_TRUE(queue
->Push(msg_id /*seqid*/, data, 100, current_sys_time_ms(), msg_id,
msg_id, true)
.ok());
queue->Send();
}
@@ -326,25 +338,30 @@ class StreamingQueueUpStreamTestSuite : public StreamingQueueTestSuite {
class StreamingQueueDownStreamTestSuite : public StreamingQueueTestSuite {
public:
StreamingQueueDownStreamTestSuite(ActorID peer_actor_id, std::vector<ObjectID> queue_ids,
std::vector<ObjectID> rescale_queue_ids)
StreamingQueueDownStreamTestSuite(ActorID peer_actor_id,
std::vector<ObjectID> queue_ids,
std::vector<ObjectID> rescale_queue_ids)
: StreamingQueueTestSuite(peer_actor_id, queue_ids, rescale_queue_ids) {
test_func_map_ = {
{"pull_peer_async_test",
std::bind(&StreamingQueueDownStreamTestSuite::PullPeerAsyncTest, this)},
{"get_queue_test",
{"get_queue_test",
std::bind(&StreamingQueueDownStreamTestSuite::GetQueueTest, this)}};
};
void GetQueueTest() {
auto downstream_handler =
ray::streaming::DownstreamQueueMessageHandler::GetService();
auto downstream_handler = ray::streaming::DownstreamQueueMessageHandler::GetService();
ObjectID &queue_id = queue_ids_[0];
RayFunction async_call_func{ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "writer_async_call_func", ""})};
RayFunction sync_call_func{ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "writer_sync_call_func", ""})};
downstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func, sync_call_func);
RayFunction async_call_func{
ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(
ray::Language::PYTHON, {"", "", "writer_async_call_func", ""})};
RayFunction sync_call_func{
ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(
ray::Language::PYTHON, {"", "", "writer_sync_call_func", ""})};
downstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func,
sync_call_func);
downstream_handler->CreateDownstreamQueue(queue_id, peer_actor_id_);
bool is_upstream_first_pull_ = false;
@@ -357,14 +374,18 @@ class StreamingQueueDownStreamTestSuite : public StreamingQueueTestSuite {
}
void PullPeerAsyncTest() {
auto downstream_handler =
ray::streaming::DownstreamQueueMessageHandler::GetService();
auto downstream_handler = ray::streaming::DownstreamQueueMessageHandler::GetService();
ObjectID &queue_id = queue_ids_[0];
RayFunction async_call_func{ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "writer_async_call_func", ""})};
RayFunction sync_call_func{ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "writer_sync_call_func", ""})};
downstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func, sync_call_func);
RayFunction async_call_func{
ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(
ray::Language::PYTHON, {"", "", "writer_async_call_func", ""})};
RayFunction sync_call_func{
ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::FromVector(
ray::Language::PYTHON, {"", "", "writer_sync_call_func", ""})};
downstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func,
sync_call_func);
std::shared_ptr<ReaderQueue> queue =
downstream_handler->CreateDownstreamQueue(queue_id, peer_actor_id_);
@@ -511,7 +532,7 @@ class StreamingWorker {
ray::FunctionDescriptorType::kPythonFunctionDescriptor);
auto typed_descriptor = function_descriptor->As<ray::PythonFunctionDescriptor>();
STREAMING_LOG(DEBUG) << "StreamingWorker::ExecuteTask "
<< typed_descriptor->ToString();
<< typed_descriptor->ToString();
std::string func_name = typed_descriptor->FunctionName();
if (func_name == "init") {
+3 -2
View File
@@ -12,9 +12,10 @@ TEST(ProtoBufTest, MessageCommonTest) {
ray::ActorID actor_id = ray::ActorID::Of(job_id, task_id, 0);
ray::ActorID peer_actor_id = ray::ActorID::Of(job_id, task_id, 1);
ObjectID queue_id = ray::ObjectID::FromRandom();
uint8_t data[128];
std::shared_ptr<LocalMemoryBuffer> buffer = std::make_shared<LocalMemoryBuffer>(data, 128, true);
std::shared_ptr<LocalMemoryBuffer> buffer =
std::make_shared<LocalMemoryBuffer>(data, 128, true);
DataMessage msg(actor_id, peer_actor_id, queue_id, 100, 1000, 2000, buffer, true);
std::unique_ptr<LocalMemoryBuffer> serilized_buffer = msg.ToBytes();
std::shared_ptr<DataMessage> msg2 = DataMessage::FromBytes(serilized_buffer->Data());
+1 -2
View File
@@ -18,8 +18,7 @@ static int node_manager_port;
class StreamingQueueTest : public StreamingQueueTestBase {
public:
StreamingQueueTest()
: StreamingQueueTestBase(1, node_manager_port) {}
StreamingQueueTest() : StreamingQueueTestBase(1, node_manager_port) {}
};
class StreamingWriterTest : public StreamingQueueTestBase {