[Streaming] Support streaming flow control (#7152)

* streaming writer use event driven model.

* add RefreshChannelInfo

* fix name

* minor changes according reviewer comments

* Fix according to reviewer's comments

* fix bazel lint

* code polished

* Add more comments

* rename Stop & Start of EventQueue to Freeze and Unfreeze.

* add override

* fix

* fix return value

* support flow control

* add flow control ut in mock transfer

* minor changes according to comments

* add java and python worker adaption

Co-authored-by: wanxing <wanxing.wwx@alibaba-inc.com>
This commit is contained in:
Lingxuan Zuo
2020-02-24 23:48:04 +08:00
committed by GitHub
parent 7e115490d9
commit f995099e00
16 changed files with 410 additions and 112 deletions
@@ -40,5 +40,10 @@ public class Config {
// operator type
public static final String OPERATOR_TYPE = "operator_type";
// flow control
public static final String FLOW_CONTROL_TYPE = "streaming.flow_control_type";
public static final String WRITER_CONSUMED_STEP = "streaming.writer.consumed_step";
public static final String READER_CONSUMED_STEP = "streaming.reader.consumed_step";
}
@@ -31,6 +31,19 @@ public class ChannelUtils {
builder.setEmptyMessageInterval(
Integer.parseInt(conf.get(Config.STREAMING_EMPTY_MESSAGE_INTERVAL)));
}
if (conf.containsKey(Config.FLOW_CONTROL_TYPE)) {
builder.setFlowControlType(
Streaming.FlowControlType.forNumber(
Integer.parseInt(conf.get(Config.FLOW_CONTROL_TYPE))));
}
if (conf.containsKey(Config.WRITER_CONSUMED_STEP)) {
builder.setWriterConsumedStep(
Integer.parseInt(conf.get(Config.WRITER_CONSUMED_STEP)));
}
if (conf.containsKey(Config.READER_CONSUMED_STEP)) {
builder.setReaderConsumedStep(
Integer.parseInt(conf.get(Config.READER_CONSUMED_STEP)));
}
Streaming.StreamingConfig streamingConf = builder.build();
LOGGER.info("Streaming native conf {}", streamingConf.toString());
return streamingConf.toByteArray();
+5
View File
@@ -21,3 +21,8 @@ class Config:
# operator type
OPERATOR_TYPE = "operator_type"
# flow control
FLOW_CONTROL_TYPE = "streaming.flow_control_type"
WRITER_CONSUMED_STEP = "streaming.writer.consumed_step"
READER_CONSUMED_STEP = "streaming.reader.consumed_step"
+8
View File
@@ -276,6 +276,14 @@ def _to_native_conf(conf):
if Config.STREAMING_EMPTY_MESSAGE_INTERVAL in conf:
config.empty_message_interval = \
conf[Config.STREAMING_EMPTY_MESSAGE_INTERVAL]
if Config.FLOW_CONTROL_TYPE in conf:
conf.flow_control_type = conf[Config.FLOW_CONTROL_TYPE]
if Config.WRITER_CONSUMED_STEP in conf:
conf.writer_consumed_step = \
conf[Config.WRITER_CONSUMED_STEP]
if Config.READER_CONSUMED_STEP in conf:
conf.reader_consumed_step = \
conf[Config.READER_CONSUMED_STEP]
logger.info("conf: %s", str(config))
return config.SerializeToString()
+32 -15
View File
@@ -218,9 +218,10 @@ struct MockQueueItem {
class MockQueue {
public:
std::unordered_map<ObjectID, std::shared_ptr<AbstractRingBuffer<MockQueueItem>>>
message_buffer_;
message_bffer;
std::unordered_map<ObjectID, std::shared_ptr<AbstractRingBuffer<MockQueueItem>>>
consumed_buffer_;
consumed_buffer;
std::unordered_map<ObjectID, StreamingQueueInfo> queue_info_map;
static std::mutex mutex;
static MockQueue &GetMockQueue() {
static MockQueue mock_queue;
@@ -232,25 +233,25 @@ std::mutex MockQueue::mutex;
StreamingStatus MockProducer::CreateTransferChannel() {
std::unique_lock<std::mutex> lock(MockQueue::mutex);
MockQueue &mock_queue = MockQueue::GetMockQueue();
mock_queue.message_buffer_[channel_info_.channel_id] =
std::make_shared<RingBufferImplThreadSafe<MockQueueItem>>(500);
mock_queue.consumed_buffer_[channel_info_.channel_id] =
std::make_shared<RingBufferImplThreadSafe<MockQueueItem>>(500);
mock_queue.message_bffer[channel_info_.channel_id] =
std::make_shared<RingBufferImplThreadSafe<MockQueueItem>>(10000);
mock_queue.consumed_buffer[channel_info_.channel_id] =
std::make_shared<RingBufferImplThreadSafe<MockQueueItem>>(10000);
return StreamingStatus::OK;
}
StreamingStatus MockProducer::DestroyTransferChannel() {
std::unique_lock<std::mutex> lock(MockQueue::mutex);
MockQueue &mock_queue = MockQueue::GetMockQueue();
mock_queue.message_buffer_.erase(channel_info_.channel_id);
mock_queue.consumed_buffer_.erase(channel_info_.channel_id);
mock_queue.message_bffer.erase(channel_info_.channel_id);
mock_queue.consumed_buffer.erase(channel_info_.channel_id);
return StreamingStatus::OK;
}
StreamingStatus MockProducer::ProduceItemToChannel(uint8_t *data, uint32_t data_size) {
std::unique_lock<std::mutex> lock(MockQueue::mutex);
MockQueue &mock_queue = MockQueue::GetMockQueue();
auto &ring_buffer = mock_queue.message_buffer_[channel_info_.channel_id];
auto &ring_buffer = mock_queue.message_bffer[channel_info_.channel_id];
if (ring_buffer->Full()) {
return StreamingStatus::OutOfMemory;
}
@@ -260,6 +261,14 @@ StreamingStatus MockProducer::ProduceItemToChannel(uint8_t *data, uint32_t data_
item.data_size = data_size;
std::memcpy(item.data.get(), data, data_size);
ring_buffer->Push(item);
mock_queue.queue_info_map[channel_info_.channel_id].last_seq_id = item.seq_id;
return StreamingStatus::OK;
}
StreamingStatus MockProducer::RefreshChannelInfo() {
MockQueue &mock_queue = MockQueue::GetMockQueue();
channel_info_.queue_info.consumed_seq_id =
mock_queue.queue_info_map[channel_info_.channel_id].consumed_seq_id;
return StreamingStatus::OK;
}
@@ -269,16 +278,16 @@ StreamingStatus MockConsumer::ConsumeItemFromChannel(uint64_t &offset_id, uint8_
std::unique_lock<std::mutex> lock(MockQueue::mutex);
MockQueue &mock_queue = MockQueue::GetMockQueue();
auto &channel_id = channel_info_.channel_id;
if (mock_queue.message_buffer_.find(channel_id) == mock_queue.message_buffer_.end()) {
if (mock_queue.message_bffer.find(channel_id) == mock_queue.message_bffer.end()) {
return StreamingStatus::NoSuchItem;
}
if (mock_queue.message_buffer_[channel_id]->Empty()) {
if (mock_queue.message_bffer[channel_id]->Empty()) {
return StreamingStatus::NoSuchItem;
}
MockQueueItem item = mock_queue.message_buffer_[channel_id]->Front();
mock_queue.message_buffer_[channel_id]->Pop();
mock_queue.consumed_buffer_[channel_id]->Push(item);
MockQueueItem item = mock_queue.message_bffer[channel_id]->Front();
mock_queue.message_bffer[channel_id]->Pop();
mock_queue.consumed_buffer[channel_id]->Push(item);
offset_id = item.seq_id;
data = item.data.get();
data_size = item.data_size;
@@ -289,10 +298,18 @@ StreamingStatus MockConsumer::NotifyChannelConsumed(uint64_t offset_id) {
std::unique_lock<std::mutex> lock(MockQueue::mutex);
MockQueue &mock_queue = MockQueue::GetMockQueue();
auto &channel_id = channel_info_.channel_id;
auto &ring_buffer = mock_queue.consumed_buffer_[channel_id];
auto &ring_buffer = mock_queue.consumed_buffer[channel_id];
while (!ring_buffer->Empty() && ring_buffer->Front().seq_id <= offset_id) {
ring_buffer->Pop();
}
mock_queue.queue_info_map[channel_id].consumed_seq_id = offset_id;
return StreamingStatus::OK;
}
StreamingStatus MockConsumer::RefreshChannelInfo() {
MockQueue &mock_queue = MockQueue::GetMockQueue();
channel_info_.queue_info.last_seq_id =
mock_queue.queue_info_map[channel_info_.channel_id].last_seq_id;
return StreamingStatus::OK;
}
+8 -6
View File
@@ -51,11 +51,13 @@ struct ConsumerChannelInfo {
StreamingQueueInfo queue_info;
uint64_t last_queue_item_delay;
uint64_t last_queue_item_latency;
uint64_t last_queue_target_diff;
uint64_t get_queue_item_times;
uint64_t last_queue_item_delay = 0;
uint64_t last_queue_item_latency = 0;
uint64_t last_queue_target_diff = 0;
uint64_t get_queue_item_times = 0;
ActorID actor_id;
// Total count of notify request.
uint64_t notify_cnt = 0;
};
/// Two types of channel are presented:
@@ -162,7 +164,7 @@ class MockProducer : public ProducerChannel {
return StreamingStatus::OK;
}
StreamingStatus RefreshChannelInfo() override { return StreamingStatus::OK; }
StreamingStatus RefreshChannelInfo() override;
StreamingStatus ProduceItemToChannel(uint8_t *data, uint32_t data_size) override;
@@ -182,7 +184,7 @@ class MockConsumer : public ConsumerChannel {
uint64_t checkpoint_offset) override {
return StreamingStatus::OK;
}
StreamingStatus RefreshChannelInfo() override { return StreamingStatus::OK; }
StreamingStatus RefreshChannelInfo() override;
StreamingStatus ConsumeItemFromChannel(uint64_t &offset_id, uint8_t *&data,
uint32_t &data_size, uint32_t timeout) override;
StreamingStatus NotifyChannelConsumed(uint64_t offset_id) override;
+15
View File
@@ -37,6 +37,21 @@ void StreamingConfig::FromProto(const uint8_t *data, uint32_t size) {
if (config.empty_message_interval() != 0) {
SetEmptyMessageTimeInterval(config.empty_message_interval());
}
if (config.flow_control_type() != proto::FlowControlType::UNKNOWN_FLOW_CONTROL_TYPE) {
SetFlowControlType(config.flow_control_type());
}
if (config.writer_consumed_step() != 0) {
SetWriterConsumedStep(config.writer_consumed_step());
}
if (config.reader_consumed_step() != 0) {
SetReaderConsumedStep(config.reader_consumed_step());
}
if (config.event_driven_flow_control_interval()) {
SetReaderConsumedStep(config.event_driven_flow_control_interval());
}
STREAMING_CHECK(writer_consumed_step_ >= reader_consumed_step_)
<< "Writer consuemd step " << writer_consumed_step_
<< "can not be smaller then reader consumed step " << reader_consumed_step_;
}
uint32_t StreamingConfig::GetRingBufferCapacity() const { return ring_buffer_capacity_; }
+17
View File
@@ -33,6 +33,17 @@ class StreamingConfig {
std::string task_job_id_ = JobID::Nil().Hex();
// Default flow control type is unconsumed sequence flow control. More detail
// introducation and implemention in ray/streaming/src/flow_control.h.
streaming::proto::FlowControlType flow_control_type_ =
streaming::proto::FlowControlType::UnconsumedSeqFlowControl;
// Default writer and reader consumed step.
uint32_t writer_consumed_step_ = 1000;
uint32_t reader_consumed_step_ = 100;
uint32_t event_driven_flow_control_interval_ = 1;
public:
void FromProto(const uint8_t *, uint32_t size);
@@ -46,6 +57,12 @@ class StreamingConfig {
DECL_GET_SET_PROPERTY(uint32_t, EmptyMessageTimeInterval, empty_message_time_interval_)
DECL_GET_SET_PROPERTY(streaming::proto::OperatorType, OperatorType, operator_type_)
DECL_GET_SET_PROPERTY(const std::string &, JobName, job_name_)
DECL_GET_SET_PROPERTY(uint32_t, WriterConsumedStep, writer_consumed_step_)
DECL_GET_SET_PROPERTY(uint32_t, ReaderConsumedStep, reader_consumed_step_)
DECL_GET_SET_PROPERTY(streaming::proto::FlowControlType, FlowControlType,
flow_control_type_)
DECL_GET_SET_PROPERTY(uint32_t, EventDrivenFlowControlInterval,
event_driven_flow_control_interval_)
uint32_t GetRingBufferCapacity() const;
/// Note(lingxuan.zlx), RingBufferCapacity's valid range is from 1 to
+34 -4
View File
@@ -199,10 +199,9 @@ StreamingStatus DataReader::GetMergedMessageBundle(std::shared_ptr<DataBundle> &
StreamingStatus DataReader::GetBundle(const uint32_t timeout_ms,
std::shared_ptr<DataBundle> &message) {
// Notify consumed every item in this mode.
// Notify upstream that last fetched item has been consumed.
if (last_fetched_queue_item_) {
NotifyConsumedItem(channel_info_map_[last_fetched_queue_item_->from],
last_fetched_queue_item_->seq_id);
NotifyConsumed(last_fetched_queue_item_);
}
/// DataBundle will be returned to the upper layer in the following cases:
@@ -246,7 +245,7 @@ StreamingStatus DataReader::GetBundle(const uint32_t timeout_ms,
RETURN_IF_NOT_OK(GetMergedMessageBundle(message, is_valid_break));
if (!is_valid_break) {
empty_bundle_cnt++;
NotifyConsumedItem(channel_info_map_[message->from], message->seq_id);
NotifyConsumed(message);
}
}
last_message_latency_ += current_time_ms() - start_time;
@@ -282,6 +281,37 @@ void DataReader::Stop() {
runtime_context_->SetRuntimeStatus(RuntimeStatus::Interrupted);
}
void DataReader::NotifyConsumed(std::shared_ptr<DataBundle> &message) {
auto &channel_info = channel_info_map_[message->from];
auto &queue_info = channel_info.queue_info;
channel_info.notify_cnt++;
if (queue_info.target_seq_id <= message->seq_id) {
NotifyConsumedItem(channel_info, message->seq_id);
channel_map_[channel_info.channel_id]->RefreshChannelInfo();
if (queue_info.last_seq_id != QUEUE_INVALID_SEQ_ID) {
uint64_t original_target_seq_id = queue_info.target_seq_id;
queue_info.target_seq_id = std::min(
queue_info.last_seq_id,
message->seq_id + runtime_context_->GetConfig().GetReaderConsumedStep());
channel_info.last_queue_target_diff =
queue_info.target_seq_id - original_target_seq_id;
} else {
STREAMING_LOG(WARNING) << "[Reader] [QueueInfo] channel id " << message->from
<< ", last seq id " << queue_info.last_seq_id;
}
STREAMING_LOG(DEBUG) << "[Reader] [Consumed] Trigger notify consumed"
<< ", channel id => " << message->from << ", last seq id => "
<< queue_info.last_seq_id << ", target seq id => "
<< queue_info.target_seq_id << ", consumed seq id => "
<< message->seq_id << ", last message id => "
<< message->meta->GetLastMessageId() << ", bundle type => "
<< static_cast<uint32_t>(message->meta->GetBundleType())
<< ", last message bundle ts => "
<< message->meta->GetMessageBundleTs();
}
}
bool StreamingReaderMsgPtrComparator::operator()(const std::shared_ptr<DataBundle> &a,
const std::shared_ptr<DataBundle> &b) {
STREAMING_CHECK(a->meta);
+5 -3
View File
@@ -98,11 +98,13 @@ class DataReader {
/// Notify input queues to clear data whose seq id is equal or less than offset.
/// It's used when checkpoint is done.
/// \param channel_info
/// \param offset
///
/// \param channel_info consumer's channel info
/// \param offset consumed channel offset
void NotifyConsumedItem(ConsumerChannelInfo &channel_info, uint64_t offset);
//// Notify message related channel to clear data.
void NotifyConsumed(std::shared_ptr<DataBundle> &message);
private:
/// Create channels and connect to all upstream.
StreamingStatus InitChannel();
+79 -46
View File
@@ -15,50 +15,6 @@
namespace ray {
namespace streaming {
void DataWriter::WriterLoopForward() {
STREAMING_CHECK(RuntimeStatus::Running == runtime_context_->GetRuntimeStatus());
while (true) {
int64_t min_passby_message_ts = std::numeric_limits<int64_t>::max();
uint32_t empty_messge_send_count = 0;
for (auto &output_queue : output_queue_ids_) {
if (RuntimeStatus::Running != runtime_context_->GetRuntimeStatus()) {
return;
}
ProducerChannelInfo &channel_info = channel_info_map_[output_queue];
bool is_push_empty_message = false;
StreamingStatus write_status =
WriteChannelProcess(channel_info, &is_push_empty_message);
int64_t current_ts = current_time_ms();
if (StreamingStatus::OK == write_status) {
channel_info.message_pass_by_ts = current_ts;
if (is_push_empty_message) {
min_passby_message_ts =
std::min(channel_info.message_pass_by_ts, min_passby_message_ts);
empty_messge_send_count++;
}
} else if (StreamingStatus::FullChannel == write_status) {
} else {
if (StreamingStatus::EmptyRingBuffer != write_status) {
STREAMING_LOG(DEBUG) << "write buffer status => "
<< static_cast<uint32_t>(write_status)
<< ", is push empty message => " << is_push_empty_message;
}
}
}
if (empty_messge_send_count == output_queue_ids_.size()) {
// Sleep if empty message was sent in all channel.
uint64_t sleep_time_ = current_time_ms() - min_passby_message_ts;
// Sleep_time can be bigger than time interval because of network jitter.
if (sleep_time_ <= runtime_context_->GetConfig().GetEmptyMessageTimeInterval()) {
std::this_thread::sleep_for(std::chrono::milliseconds(
runtime_context_->GetConfig().GetEmptyMessageTimeInterval() - sleep_time_));
}
}
}
}
StreamingStatus DataWriter::WriteChannelProcess(ProducerChannelInfo &channel_info,
bool *is_empty_message) {
// No message in buffer, empty message will be sent to downstream queue.
@@ -100,6 +56,8 @@ void DataWriter::Run() {
// Enable empty message timer after writer running.
empty_message_thread_ =
std::make_shared<std::thread>(&DataWriter::EmptyMessageTimerCallback, this);
flow_control_thread_ =
std::make_shared<std::thread>(&DataWriter::FlowControlTimer, this);
}
/// Since every memory ring buffer's size is limited, when the writing buffer is
@@ -192,6 +150,16 @@ StreamingStatus DataWriter::Init(const std::vector<ObjectID> &queue_id_vec,
return status;
}
}
switch (runtime_context_->GetConfig().GetFlowControlType()) {
case proto::FlowControlType::UnconsumedSeqFlowControl:
flow_controller_ = std::make_shared<UnconsumedSeqFlowControl>(
channel_map_, runtime_context_->GetConfig().GetWriterConsumedStep());
break;
default:
flow_controller_ = std::make_shared<NoFlowControl>();
break;
}
// Register empty event and user event to event server.
event_service_ = std::make_shared<EventService>();
event_service_->Register(
@@ -199,6 +167,8 @@ StreamingStatus DataWriter::Init(const std::vector<ObjectID> &queue_id_vec,
std::bind(&DataWriter::SendEmptyToChannel, this, std::placeholders::_1));
event_service_->Register(EventType::UserEvent, std::bind(&DataWriter::WriteAllToChannel,
this, std::placeholders::_1));
event_service_->Register(EventType::FlowEvent, std::bind(&DataWriter::WriteAllToChannel,
this, std::placeholders::_1));
runtime_context_->SetRuntimeStatus(RuntimeStatus::Running);
return StreamingStatus::OK;
@@ -219,6 +189,10 @@ DataWriter::~DataWriter() {
STREAMING_LOG(INFO) << "Empty message thread waiting for join";
empty_message_thread_->join();
}
if (flow_control_thread_->joinable()) {
STREAMING_LOG(INFO) << "FlowControl timer thread waiting for join";
flow_control_thread_->join();
}
int user_event_count = 0;
int empty_event_count = 0;
int flow_control_event_count = 0;
@@ -358,6 +332,16 @@ bool DataWriter::WriteAllToChannel(ProducerChannelInfo *info) {
if (RuntimeStatus::Running != runtime_context_->GetRuntimeStatus()) {
return false;
}
// Stop to write remained messages to channel if channel has been blocked by
// flow control.
if (channel_info.flow_control) {
break;
}
// Check this channel is blocked by flow control or not.
if (flow_controller_->ShouldFlowControl(channel_info)) {
channel_info.flow_control = true;
break;
}
uint64_t ring_buffer_remain = channel_info.writer_ring_buffer->Size();
StreamingStatus write_status = WriteBufferToChannel(channel_info, ring_buffer_remain);
int64_t current_ts = current_time_ms();
@@ -365,11 +349,11 @@ bool DataWriter::WriteAllToChannel(ProducerChannelInfo *info) {
channel_info.message_pass_by_ts = current_ts;
} else if (StreamingStatus::FullChannel == write_status ||
StreamingStatus::OutOfMemory == write_status) {
channel_info.flow_control = true;
++channel_info.queue_full_cnt;
STREAMING_LOG(DEBUG) << "FullChannel after writing to channel, queue_full_cnt:"
<< channel_info.queue_full_cnt;
// TODO(lingxuan.zlx): we should notify consumed status to channel when
// flow control is supported.
RefreshChannelAndNotifyConsumed(channel_info);
} else if (StreamingStatus::EmptyRingBuffer != write_status) {
STREAMING_LOG(INFO) << channel_info.channel_id
<< ":something wrong when WriteToQueue "
@@ -447,5 +431,54 @@ void DataWriter::EmptyMessageTimerCallback() {
}
}
void DataWriter::RefreshChannelAndNotifyConsumed(ProducerChannelInfo &channel_info) {
// Refresh current downstream consumed seq id.
channel_map_[channel_info.channel_id]->RefreshChannelInfo();
// Notify the consumed information to local channel.
NotifyConsumedItem(channel_info, channel_info.queue_info.consumed_seq_id);
}
void DataWriter::NotifyConsumedItem(ProducerChannelInfo &channel_info, uint32_t offset) {
if (offset > channel_info.current_seq_id) {
STREAMING_LOG(WARNING) << "Can not notify consumed this offset " << offset
<< " that's out of range, max seq id "
<< channel_info.current_seq_id;
} else {
channel_map_[channel_info.channel_id]->NotifyChannelConsumed(offset);
}
}
void DataWriter::FlowControlTimer() {
std::chrono::milliseconds MockTimer(
runtime_context_->GetConfig().GetEventDrivenFlowControlInterval());
while (true) {
if (runtime_context_->GetRuntimeStatus() != RuntimeStatus::Running) {
return;
}
for (const auto &output_queue : output_queue_ids_) {
if (runtime_context_->GetRuntimeStatus() != RuntimeStatus::Running) {
return;
}
ProducerChannelInfo &channel_info = channel_info_map_[output_queue];
if (!channel_info.flow_control) {
continue;
}
if (!flow_controller_->ShouldFlowControl(channel_info)) {
channel_info.flow_control = false;
Event event{&channel_info, EventType::FlowEvent,
channel_info.writer_ring_buffer->IsFull()};
event_service_->Push(event);
++channel_info.flow_control_cnt;
}
}
std::this_thread::sleep_for(MockTimer);
}
}
void DataWriter::GetOffsetInfo(
std::unordered_map<ObjectID, ProducerChannelInfo> *&offset_map) {
offset_map = &channel_info_map_;
}
} // namespace streaming
} // namespace ray
+45 -37
View File
@@ -10,6 +10,7 @@
#include "channel.h"
#include "config/streaming_config.h"
#include "event_service.h"
#include "flow_control.h"
#include "message/message_bundle.h"
#include "runtime_context.h"
@@ -30,6 +31,40 @@ namespace streaming {
/// accordingly. It will sleep for a short interval to save cpu if all ring
/// buffers have no data in that moment.
class DataWriter {
public:
explicit DataWriter(std::shared_ptr<RuntimeContext> &runtime_context);
virtual ~DataWriter();
/// Streaming writer client initialization.
/// \param queue_id_vec queue id vector
/// \param channel_message_id_vec channel seq id is related with message checkpoint
/// \param queue_size queue size (memory size not length)
StreamingStatus Init(const std::vector<ObjectID> &channel_ids,
const std::vector<ActorID> &actor_ids,
const std::vector<uint64_t> &channel_message_id_vec,
const std::vector<uint64_t> &queue_size_vec);
/// To increase throughout, we employed an output buffer for message transformation,
/// which means we merge a lot of message to a message bundle and no message will be
/// pushed into queue directly util daemon thread does this action.
/// Additionally, writing will block when buffer ring is full intentionly.
/// \param q_id, destination channel id
/// \param data, pointer of raw data
/// \param data_size, raw data size
/// \param message_type
/// \return message seq iq
uint64_t WriteMessageToBufferRing(
const ObjectID &q_id, uint8_t *data, uint32_t data_size,
StreamingMessageType message_type = StreamingMessageType::Message);
void Run();
void Stop();
/// Get offset information about channels for checkpoint.
/// \param offset_map (return value)
void GetOffsetInfo(std::unordered_map<ObjectID, ProducerChannelInfo> *&offset_map);
private:
bool IsMessageAvailableInBuffer(ProducerChannelInfo &channel_info);
@@ -42,16 +77,6 @@ class DataWriter {
StreamingStatus WriteBufferToChannel(ProducerChannelInfo &channel_info,
uint64_t &buffer_remain);
/// Start the loop forward thread for collecting messages from all channels.
/// Invoking stack:
/// WriterLoopForward
/// -- WriteChannelProcess
/// -- WriteBufferToChannel
/// -- CollectFromRingBuffer
/// -- WriteTransientBufferToChannel
/// -- WriteEmptyMessage(if WriteChannelProcess return empty state)
void WriterLoopForward();
/// Push empty message when no valid message or bundle was produced each time
/// interval.
/// \param channel_info
@@ -79,42 +104,25 @@ class DataWriter {
void EmptyMessageTimerCallback();
public:
explicit DataWriter(std::shared_ptr<RuntimeContext> &runtime_context);
virtual ~DataWriter();
/// Notify channel consumed refreshing downstream queue stats.
void RefreshChannelAndNotifyConsumed(ProducerChannelInfo &channel_info);
/// Streaming writer client initialization.
/// \param queue_id_vec queue id vector
/// \param channel_message_id_vec channel seq id is related with message checkpoint
/// \param queue_size queue size (memory size not length)
StreamingStatus Init(const std::vector<ObjectID> &channel_ids,
const std::vector<ActorID> &actor_ids,
const std::vector<uint64_t> &channel_message_id_vec,
const std::vector<uint64_t> &queue_size_vec);
/// Notify channel consumed by given offset.
void NotifyConsumedItem(ProducerChannelInfo &channel_info, uint32_t offset);
/// To increase throughout, we employed an output buffer for message transformation,
/// which means we merge a lot of message to a message bundle and no message will be
/// pushed into queue directly util daemon thread does this action.
/// Additionally, writing will block when buffer ring is full intentionly.
/// \param q_id
/// \param data
/// \param data_size
/// \param message_type
/// \return message seq iq
uint64_t WriteMessageToBufferRing(
const ObjectID &q_id, uint8_t *data, uint32_t data_size,
StreamingMessageType message_type = StreamingMessageType::Message);
void Run();
void Stop();
void FlowControlTimer();
private:
std::shared_ptr<EventService> event_service_;
std::shared_ptr<std::thread> empty_message_thread_;
std::shared_ptr<std::thread> flow_control_thread_;
// One channel have unique identity.
std::vector<ObjectID> output_queue_ids_;
// Flow controller makes a decision when it's should be blocked and avoid
// unnecessary overflow.
std::shared_ptr<FlowControl> flow_controller_;
protected:
std::unordered_map<ObjectID, ProducerChannelInfo> channel_info_map_;
+35
View File
@@ -0,0 +1,35 @@
#include "flow_control.h"
namespace ray {
namespace streaming {
UnconsumedSeqFlowControl::UnconsumedSeqFlowControl(
std::unordered_map<ObjectID, std::shared_ptr<ProducerChannel>> &channel_map,
uint32_t step)
: channel_map_(channel_map), consumed_step_(step){};
bool UnconsumedSeqFlowControl::ShouldFlowControl(ProducerChannelInfo &channel_info) {
auto &queue_info = channel_info.queue_info;
if (queue_info.target_seq_id <= channel_info.current_seq_id) {
channel_map_[channel_info.channel_id]->RefreshChannelInfo();
// Target seq id is maximum upper limit in current condition.
channel_info.queue_info.target_seq_id =
channel_info.queue_info.consumed_seq_id + consumed_step_;
STREAMING_LOG(DEBUG) << "Flow control stop writing to downstream, current max id => "
<< channel_info.current_seq_id << ", target seq id => "
<< queue_info.target_seq_id << ", consumed_id => "
<< queue_info.consumed_seq_id << ", q id => "
<< channel_info.channel_id
<< ". if this log keeps printing, it means something wrong "
"with queue's info API, or downstream node is not "
"consuming data.";
// Double check after refreshing if target seq id is changed.
if (queue_info.target_seq_id <= channel_info.current_seq_id) {
return true;
}
}
return false;
}
} // namespace streaming
} // namespace ray
+45
View File
@@ -0,0 +1,45 @@
#ifndef RAY_STREAMING_FLOW_CONTROL_H
#define RAY_STREAMING_FLOW_CONTROL_H
#include "channel.h"
namespace ray {
namespace streaming {
class ProducerTransfer;
/// We devise a flow control system in queue channel, and that's called flow
/// control by unconsumed seq. Upstream worker will detect consumer statistics via
/// api so it can keep fixed length messages in this process, which makes a
/// continuous datastream in channel or on the transporting way, then downstream
/// can read them from channel immediately.
/// To debug or compare with theses flow control methods, we also support
/// no-flow-control that will do nothing in transporting.
class FlowControl {
public:
virtual ~FlowControl() = default;
virtual bool ShouldFlowControl(ProducerChannelInfo &channel_info) = 0;
};
class NoFlowControl : public FlowControl {
public:
bool ShouldFlowControl(ProducerChannelInfo &channel_info) { return false; }
~NoFlowControl() = default;
};
class UnconsumedSeqFlowControl : public FlowControl {
public:
UnconsumedSeqFlowControl(
std::unordered_map<ObjectID, std::shared_ptr<ProducerChannel>> &channel_map,
uint32_t step);
~UnconsumedSeqFlowControl() = default;
bool ShouldFlowControl(ProducerChannelInfo &channel_info);
private:
/// NOTE(wanxing.wwx) Reference to channel_map_ variable in DataWriter.
/// Flow-control is checked in FlowControlThread, so channel_map_ is accessed
/// in multithread situation. Especially, while rescaling, channel_map_ maybe
/// changed. But for now, FlowControlThread is stopped before rescaling.
std::unordered_map<ObjectID, std::shared_ptr<ProducerChannel>> &channel_map_;
uint32_t consumed_step_;
};
} // namespace streaming
} // namespace ray
#endif // RAY_STREAMING_FLOW_CONTROL_H
+10
View File
@@ -11,6 +11,12 @@ enum OperatorType {
SINK = 3;
}
enum FlowControlType {
UNKNOWN_FLOW_CONTROL_TYPE = 0;
UnconsumedSeqFlowControl = 1;
NoFlowControl = 2;
}
// all string in this message is ASCII string
message StreamingConfig {
string job_name = 1;
@@ -20,4 +26,8 @@ message StreamingConfig {
OperatorType role = 5;
uint32 ring_buffer_capacity = 6;
uint32 empty_message_interval = 7;
FlowControlType flow_control_type = 8;
uint32 writer_consumed_step = 9;
uint32 reader_consumed_step = 10;
uint32 event_driven_flow_control_interval = 11;
}
+54 -1
View File
@@ -37,7 +37,7 @@ TEST(StreamingMockTransfer, mock_produce_consume) {
class StreamingTransferTest : public ::testing::Test {
public:
StreamingTransferTest() {
std::shared_ptr<RuntimeContext> runtime_context(new RuntimeContext());
runtime_context = std::make_shared<RuntimeContext>();
runtime_context->MarkMockTest();
writer = std::make_shared<DataWriter>(runtime_context);
reader = std::make_shared<DataReader>(runtime_context);
@@ -64,6 +64,7 @@ class StreamingTransferTest : public ::testing::Test {
std::shared_ptr<DataWriter> writer;
std::shared_ptr<DataReader> reader;
std::vector<ObjectID> queue_vec;
std::shared_ptr<RuntimeContext> runtime_context;
};
TEST_F(StreamingTransferTest, exchange_single_channel_test) {
@@ -130,6 +131,58 @@ TEST_F(StreamingTransferTest, exchange_consumed_test) {
write_thread.join();
}
TEST_F(StreamingTransferTest, flow_control_test) {
InitTransfer();
writer->Run();
uint32_t data_size = 8196;
std::shared_ptr<uint8_t> data(new uint8_t[data_size]);
auto func = [data, data_size](int index) { std::fill_n(data.get(), data_size, index); };
size_t num = 10000;
std::thread write_thread([this, data, data_size, &func, num]() {
for (size_t i = 0; i < num; ++i) {
func(i);
writer->WriteMessageToBufferRing(queue_vec[0], data.get(), data_size);
}
});
std::unordered_map<ObjectID, ProducerChannelInfo> *writer_offset_info = nullptr;
std::unordered_map<ObjectID, ConsumerChannelInfo> *reader_offset_info = nullptr;
writer->GetOffsetInfo(writer_offset_info);
reader->GetOffsetInfo(reader_offset_info);
uint32_t writer_step = runtime_context->GetConfig().GetWriterConsumedStep();
uint32_t reader_step = runtime_context->GetConfig().GetReaderConsumedStep();
uint64_t &writer_current_seq_id = (*writer_offset_info)[queue_vec[0]].current_seq_id;
uint64_t &writer_current_message_id =
(*writer_offset_info)[queue_vec[0]].current_message_id;
uint64_t &reader_target_seq_id =
(*reader_offset_info)[queue_vec[0]].queue_info.target_seq_id;
while (writer_current_seq_id < writer_step) {
STREAMING_LOG(INFO) << "Writer currrent seq id " << writer_current_seq_id
<< " message " << writer_current_message_id << " consumer step "
<< writer_step;
std::this_thread::sleep_for(
std::chrono::milliseconds(StreamingConfig::TIME_WAIT_UINT));
}
std::list<StreamingMessagePtr> read_message_list;
while (read_message_list.size() < num) {
std::shared_ptr<DataBundle> msg;
reader->GetBundle(5000, msg);
StreamingMessageBundlePtr bundle_ptr = StreamingMessageBundle::FromBytes(msg->data);
auto &message_list = bundle_ptr->GetMessageList();
std::copy(message_list.begin(), message_list.end(),
std::back_inserter(read_message_list));
ASSERT_GE(writer_step, writer_current_seq_id - msg->seq_id);
ASSERT_GE(msg->seq_id + reader_step, reader_target_seq_id);
}
int index = 0;
for (auto &message : read_message_list) {
func(index++);
EXPECT_EQ(std::memcmp(message->RawData(), data.get(), data_size), 0);
}
write_thread.join();
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();