Files
ray/streaming/src/flow_control.cc
T
Lingxuan Zuo f995099e00 [Streaming] Support streaming flow control (#7152)
* streaming writer use event driven model.

* add RefreshChannelInfo

* fix name

* minor changes according reviewer comments

* Fix according to reviewer's comments

* fix bazel lint

* code polished

* Add more comments

* rename Stop & Start of EventQueue to Freeze and Unfreeze.

* add override

* fix

* fix return value

* support flow control

* add flow control ut in mock transfer

* minor changes according to comments

* add java and python worker adaption

Co-authored-by: wanxing <wanxing.wwx@alibaba-inc.com>
2020-02-24 23:48:04 +08:00

36 lines
1.5 KiB
C++

#include "flow_control.h"
namespace ray {
namespace streaming {
UnconsumedSeqFlowControl::UnconsumedSeqFlowControl(
std::unordered_map<ObjectID, std::shared_ptr<ProducerChannel>> &channel_map,
uint32_t step)
: channel_map_(channel_map), consumed_step_(step){};
bool UnconsumedSeqFlowControl::ShouldFlowControl(ProducerChannelInfo &channel_info) {
auto &queue_info = channel_info.queue_info;
if (queue_info.target_seq_id <= channel_info.current_seq_id) {
channel_map_[channel_info.channel_id]->RefreshChannelInfo();
// Target seq id is maximum upper limit in current condition.
channel_info.queue_info.target_seq_id =
channel_info.queue_info.consumed_seq_id + consumed_step_;
STREAMING_LOG(DEBUG) << "Flow control stop writing to downstream, current max id => "
<< channel_info.current_seq_id << ", target seq id => "
<< queue_info.target_seq_id << ", consumed_id => "
<< queue_info.consumed_seq_id << ", q id => "
<< channel_info.channel_id
<< ". if this log keeps printing, it means something wrong "
"with queue's info API, or downstream node is not "
"consuming data.";
// Double check after refreshing if target seq id is changed.
if (queue_info.target_seq_id <= channel_info.current_seq_id) {
return true;
}
}
return false;
}
} // namespace streaming
} // namespace ray