From f995099e003a137308faeb2107e1806400a01929 Mon Sep 17 00:00:00 2001 From: Lingxuan Zuo Date: Mon, 24 Feb 2020 23:48:04 +0800 Subject: [PATCH] [Streaming] Support streaming flow control (#7152) * streaming writer use event driven model. * add RefreshChannelInfo * fix name * minor changes according reviewer comments * Fix according to reviewer's comments * fix bazel lint * code polished * Add more comments * rename Stop & Start of EventQueue to Freeze and Unfreeze. * add override * fix * fix return value * support flow control * add flow control ut in mock transfer * minor changes according to comments * add java and python worker adaption Co-authored-by: wanxing --- .../java/org/ray/streaming/util/Config.java | 5 + .../runtime/transfer/ChannelUtils.java | 13 ++ streaming/python/config.py | 5 + streaming/python/runtime/transfer.py | 8 ++ streaming/src/channel.cc | 47 ++++--- streaming/src/channel.h | 14 +- streaming/src/config/streaming_config.cc | 15 +++ streaming/src/config/streaming_config.h | 17 +++ streaming/src/data_reader.cc | 38 +++++- streaming/src/data_reader.h | 8 +- streaming/src/data_writer.cc | 125 +++++++++++------- streaming/src/data_writer.h | 82 ++++++------ streaming/src/flow_control.cc | 35 +++++ streaming/src/flow_control.h | 45 +++++++ streaming/src/protobuf/streaming.proto | 10 ++ streaming/src/test/mock_transfer_tests.cc | 55 +++++++- 16 files changed, 410 insertions(+), 112 deletions(-) create mode 100644 streaming/src/flow_control.cc create mode 100644 streaming/src/flow_control.h diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/util/Config.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/util/Config.java index cad4dc99e..b8392e96b 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/util/Config.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/util/Config.java @@ -40,5 +40,10 @@ public class Config { // operator type public static final String OPERATOR_TYPE = "operator_type"; + // flow control + public static final String FLOW_CONTROL_TYPE = "streaming.flow_control_type"; + public static final String WRITER_CONSUMED_STEP = "streaming.writer.consumed_step"; + public static final String READER_CONSUMED_STEP = "streaming.reader.consumed_step"; + } diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/transfer/ChannelUtils.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/transfer/ChannelUtils.java index 893c21397..bba6e3e5b 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/transfer/ChannelUtils.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/transfer/ChannelUtils.java @@ -31,6 +31,19 @@ public class ChannelUtils { builder.setEmptyMessageInterval( Integer.parseInt(conf.get(Config.STREAMING_EMPTY_MESSAGE_INTERVAL))); } + if (conf.containsKey(Config.FLOW_CONTROL_TYPE)) { + builder.setFlowControlType( + Streaming.FlowControlType.forNumber( + Integer.parseInt(conf.get(Config.FLOW_CONTROL_TYPE)))); + } + if (conf.containsKey(Config.WRITER_CONSUMED_STEP)) { + builder.setWriterConsumedStep( + Integer.parseInt(conf.get(Config.WRITER_CONSUMED_STEP))); + } + if (conf.containsKey(Config.READER_CONSUMED_STEP)) { + builder.setReaderConsumedStep( + Integer.parseInt(conf.get(Config.READER_CONSUMED_STEP))); + } Streaming.StreamingConfig streamingConf = builder.build(); LOGGER.info("Streaming native conf {}", streamingConf.toString()); return streamingConf.toByteArray(); diff --git a/streaming/python/config.py b/streaming/python/config.py index 8f7b5e941..e6d56488b 100644 --- a/streaming/python/config.py +++ b/streaming/python/config.py @@ -21,3 +21,8 @@ class Config: # operator type OPERATOR_TYPE = "operator_type" + + # flow control + FLOW_CONTROL_TYPE = "streaming.flow_control_type" + WRITER_CONSUMED_STEP = "streaming.writer.consumed_step" + READER_CONSUMED_STEP = "streaming.reader.consumed_step" diff --git a/streaming/python/runtime/transfer.py b/streaming/python/runtime/transfer.py index 4bb9b5f1d..9a65328dd 100644 --- a/streaming/python/runtime/transfer.py +++ b/streaming/python/runtime/transfer.py @@ -276,6 +276,14 @@ def _to_native_conf(conf): if Config.STREAMING_EMPTY_MESSAGE_INTERVAL in conf: config.empty_message_interval = \ conf[Config.STREAMING_EMPTY_MESSAGE_INTERVAL] + if Config.FLOW_CONTROL_TYPE in conf: + conf.flow_control_type = conf[Config.FLOW_CONTROL_TYPE] + if Config.WRITER_CONSUMED_STEP in conf: + conf.writer_consumed_step = \ + conf[Config.WRITER_CONSUMED_STEP] + if Config.READER_CONSUMED_STEP in conf: + conf.reader_consumed_step = \ + conf[Config.READER_CONSUMED_STEP] logger.info("conf: %s", str(config)) return config.SerializeToString() diff --git a/streaming/src/channel.cc b/streaming/src/channel.cc index 421517359..2d3c1271b 100644 --- a/streaming/src/channel.cc +++ b/streaming/src/channel.cc @@ -218,9 +218,10 @@ struct MockQueueItem { class MockQueue { public: std::unordered_map>> - message_buffer_; + message_bffer; std::unordered_map>> - consumed_buffer_; + consumed_buffer; + std::unordered_map queue_info_map; static std::mutex mutex; static MockQueue &GetMockQueue() { static MockQueue mock_queue; @@ -232,25 +233,25 @@ std::mutex MockQueue::mutex; StreamingStatus MockProducer::CreateTransferChannel() { std::unique_lock lock(MockQueue::mutex); MockQueue &mock_queue = MockQueue::GetMockQueue(); - mock_queue.message_buffer_[channel_info_.channel_id] = - std::make_shared>(500); - mock_queue.consumed_buffer_[channel_info_.channel_id] = - std::make_shared>(500); + mock_queue.message_bffer[channel_info_.channel_id] = + std::make_shared>(10000); + mock_queue.consumed_buffer[channel_info_.channel_id] = + std::make_shared>(10000); return StreamingStatus::OK; } StreamingStatus MockProducer::DestroyTransferChannel() { std::unique_lock lock(MockQueue::mutex); MockQueue &mock_queue = MockQueue::GetMockQueue(); - mock_queue.message_buffer_.erase(channel_info_.channel_id); - mock_queue.consumed_buffer_.erase(channel_info_.channel_id); + mock_queue.message_bffer.erase(channel_info_.channel_id); + mock_queue.consumed_buffer.erase(channel_info_.channel_id); return StreamingStatus::OK; } StreamingStatus MockProducer::ProduceItemToChannel(uint8_t *data, uint32_t data_size) { std::unique_lock lock(MockQueue::mutex); MockQueue &mock_queue = MockQueue::GetMockQueue(); - auto &ring_buffer = mock_queue.message_buffer_[channel_info_.channel_id]; + auto &ring_buffer = mock_queue.message_bffer[channel_info_.channel_id]; if (ring_buffer->Full()) { return StreamingStatus::OutOfMemory; } @@ -260,6 +261,14 @@ StreamingStatus MockProducer::ProduceItemToChannel(uint8_t *data, uint32_t data_ item.data_size = data_size; std::memcpy(item.data.get(), data, data_size); ring_buffer->Push(item); + mock_queue.queue_info_map[channel_info_.channel_id].last_seq_id = item.seq_id; + return StreamingStatus::OK; +} + +StreamingStatus MockProducer::RefreshChannelInfo() { + MockQueue &mock_queue = MockQueue::GetMockQueue(); + channel_info_.queue_info.consumed_seq_id = + mock_queue.queue_info_map[channel_info_.channel_id].consumed_seq_id; return StreamingStatus::OK; } @@ -269,16 +278,16 @@ StreamingStatus MockConsumer::ConsumeItemFromChannel(uint64_t &offset_id, uint8_ std::unique_lock lock(MockQueue::mutex); MockQueue &mock_queue = MockQueue::GetMockQueue(); auto &channel_id = channel_info_.channel_id; - if (mock_queue.message_buffer_.find(channel_id) == mock_queue.message_buffer_.end()) { + if (mock_queue.message_bffer.find(channel_id) == mock_queue.message_bffer.end()) { return StreamingStatus::NoSuchItem; } - if (mock_queue.message_buffer_[channel_id]->Empty()) { + if (mock_queue.message_bffer[channel_id]->Empty()) { return StreamingStatus::NoSuchItem; } - MockQueueItem item = mock_queue.message_buffer_[channel_id]->Front(); - mock_queue.message_buffer_[channel_id]->Pop(); - mock_queue.consumed_buffer_[channel_id]->Push(item); + MockQueueItem item = mock_queue.message_bffer[channel_id]->Front(); + mock_queue.message_bffer[channel_id]->Pop(); + mock_queue.consumed_buffer[channel_id]->Push(item); offset_id = item.seq_id; data = item.data.get(); data_size = item.data_size; @@ -289,10 +298,18 @@ StreamingStatus MockConsumer::NotifyChannelConsumed(uint64_t offset_id) { std::unique_lock lock(MockQueue::mutex); MockQueue &mock_queue = MockQueue::GetMockQueue(); auto &channel_id = channel_info_.channel_id; - auto &ring_buffer = mock_queue.consumed_buffer_[channel_id]; + auto &ring_buffer = mock_queue.consumed_buffer[channel_id]; while (!ring_buffer->Empty() && ring_buffer->Front().seq_id <= offset_id) { ring_buffer->Pop(); } + mock_queue.queue_info_map[channel_id].consumed_seq_id = offset_id; + return StreamingStatus::OK; +} + +StreamingStatus MockConsumer::RefreshChannelInfo() { + MockQueue &mock_queue = MockQueue::GetMockQueue(); + channel_info_.queue_info.last_seq_id = + mock_queue.queue_info_map[channel_info_.channel_id].last_seq_id; return StreamingStatus::OK; } diff --git a/streaming/src/channel.h b/streaming/src/channel.h index 1e367b52e..6f0fbe0e2 100644 --- a/streaming/src/channel.h +++ b/streaming/src/channel.h @@ -51,11 +51,13 @@ struct ConsumerChannelInfo { StreamingQueueInfo queue_info; - uint64_t last_queue_item_delay; - uint64_t last_queue_item_latency; - uint64_t last_queue_target_diff; - uint64_t get_queue_item_times; + uint64_t last_queue_item_delay = 0; + uint64_t last_queue_item_latency = 0; + uint64_t last_queue_target_diff = 0; + uint64_t get_queue_item_times = 0; ActorID actor_id; + // Total count of notify request. + uint64_t notify_cnt = 0; }; /// Two types of channel are presented: @@ -162,7 +164,7 @@ class MockProducer : public ProducerChannel { return StreamingStatus::OK; } - StreamingStatus RefreshChannelInfo() override { return StreamingStatus::OK; } + StreamingStatus RefreshChannelInfo() override; StreamingStatus ProduceItemToChannel(uint8_t *data, uint32_t data_size) override; @@ -182,7 +184,7 @@ class MockConsumer : public ConsumerChannel { uint64_t checkpoint_offset) override { return StreamingStatus::OK; } - StreamingStatus RefreshChannelInfo() override { return StreamingStatus::OK; } + StreamingStatus RefreshChannelInfo() override; StreamingStatus ConsumeItemFromChannel(uint64_t &offset_id, uint8_t *&data, uint32_t &data_size, uint32_t timeout) override; StreamingStatus NotifyChannelConsumed(uint64_t offset_id) override; diff --git a/streaming/src/config/streaming_config.cc b/streaming/src/config/streaming_config.cc index 1baeb4849..8fa21dc0c 100644 --- a/streaming/src/config/streaming_config.cc +++ b/streaming/src/config/streaming_config.cc @@ -37,6 +37,21 @@ void StreamingConfig::FromProto(const uint8_t *data, uint32_t size) { if (config.empty_message_interval() != 0) { SetEmptyMessageTimeInterval(config.empty_message_interval()); } + if (config.flow_control_type() != proto::FlowControlType::UNKNOWN_FLOW_CONTROL_TYPE) { + SetFlowControlType(config.flow_control_type()); + } + if (config.writer_consumed_step() != 0) { + SetWriterConsumedStep(config.writer_consumed_step()); + } + if (config.reader_consumed_step() != 0) { + SetReaderConsumedStep(config.reader_consumed_step()); + } + if (config.event_driven_flow_control_interval()) { + SetReaderConsumedStep(config.event_driven_flow_control_interval()); + } + STREAMING_CHECK(writer_consumed_step_ >= reader_consumed_step_) + << "Writer consuemd step " << writer_consumed_step_ + << "can not be smaller then reader consumed step " << reader_consumed_step_; } uint32_t StreamingConfig::GetRingBufferCapacity() const { return ring_buffer_capacity_; } diff --git a/streaming/src/config/streaming_config.h b/streaming/src/config/streaming_config.h index 592d885ff..add9a8d56 100644 --- a/streaming/src/config/streaming_config.h +++ b/streaming/src/config/streaming_config.h @@ -33,6 +33,17 @@ class StreamingConfig { std::string task_job_id_ = JobID::Nil().Hex(); + // Default flow control type is unconsumed sequence flow control. More detail + // introducation and implemention in ray/streaming/src/flow_control.h. + streaming::proto::FlowControlType flow_control_type_ = + streaming::proto::FlowControlType::UnconsumedSeqFlowControl; + + // Default writer and reader consumed step. + uint32_t writer_consumed_step_ = 1000; + uint32_t reader_consumed_step_ = 100; + + uint32_t event_driven_flow_control_interval_ = 1; + public: void FromProto(const uint8_t *, uint32_t size); @@ -46,6 +57,12 @@ class StreamingConfig { DECL_GET_SET_PROPERTY(uint32_t, EmptyMessageTimeInterval, empty_message_time_interval_) DECL_GET_SET_PROPERTY(streaming::proto::OperatorType, OperatorType, operator_type_) DECL_GET_SET_PROPERTY(const std::string &, JobName, job_name_) + DECL_GET_SET_PROPERTY(uint32_t, WriterConsumedStep, writer_consumed_step_) + DECL_GET_SET_PROPERTY(uint32_t, ReaderConsumedStep, reader_consumed_step_) + DECL_GET_SET_PROPERTY(streaming::proto::FlowControlType, FlowControlType, + flow_control_type_) + DECL_GET_SET_PROPERTY(uint32_t, EventDrivenFlowControlInterval, + event_driven_flow_control_interval_) uint32_t GetRingBufferCapacity() const; /// Note(lingxuan.zlx), RingBufferCapacity's valid range is from 1 to diff --git a/streaming/src/data_reader.cc b/streaming/src/data_reader.cc index 71afdcc8b..f53b2152b 100644 --- a/streaming/src/data_reader.cc +++ b/streaming/src/data_reader.cc @@ -199,10 +199,9 @@ StreamingStatus DataReader::GetMergedMessageBundle(std::shared_ptr & StreamingStatus DataReader::GetBundle(const uint32_t timeout_ms, std::shared_ptr &message) { - // Notify consumed every item in this mode. + // Notify upstream that last fetched item has been consumed. if (last_fetched_queue_item_) { - NotifyConsumedItem(channel_info_map_[last_fetched_queue_item_->from], - last_fetched_queue_item_->seq_id); + NotifyConsumed(last_fetched_queue_item_); } /// DataBundle will be returned to the upper layer in the following cases: @@ -246,7 +245,7 @@ StreamingStatus DataReader::GetBundle(const uint32_t timeout_ms, RETURN_IF_NOT_OK(GetMergedMessageBundle(message, is_valid_break)); if (!is_valid_break) { empty_bundle_cnt++; - NotifyConsumedItem(channel_info_map_[message->from], message->seq_id); + NotifyConsumed(message); } } last_message_latency_ += current_time_ms() - start_time; @@ -282,6 +281,37 @@ void DataReader::Stop() { runtime_context_->SetRuntimeStatus(RuntimeStatus::Interrupted); } +void DataReader::NotifyConsumed(std::shared_ptr &message) { + auto &channel_info = channel_info_map_[message->from]; + auto &queue_info = channel_info.queue_info; + channel_info.notify_cnt++; + if (queue_info.target_seq_id <= message->seq_id) { + NotifyConsumedItem(channel_info, message->seq_id); + + channel_map_[channel_info.channel_id]->RefreshChannelInfo(); + if (queue_info.last_seq_id != QUEUE_INVALID_SEQ_ID) { + uint64_t original_target_seq_id = queue_info.target_seq_id; + queue_info.target_seq_id = std::min( + queue_info.last_seq_id, + message->seq_id + runtime_context_->GetConfig().GetReaderConsumedStep()); + channel_info.last_queue_target_diff = + queue_info.target_seq_id - original_target_seq_id; + } else { + STREAMING_LOG(WARNING) << "[Reader] [QueueInfo] channel id " << message->from + << ", last seq id " << queue_info.last_seq_id; + } + STREAMING_LOG(DEBUG) << "[Reader] [Consumed] Trigger notify consumed" + << ", channel id => " << message->from << ", last seq id => " + << queue_info.last_seq_id << ", target seq id => " + << queue_info.target_seq_id << ", consumed seq id => " + << message->seq_id << ", last message id => " + << message->meta->GetLastMessageId() << ", bundle type => " + << static_cast(message->meta->GetBundleType()) + << ", last message bundle ts => " + << message->meta->GetMessageBundleTs(); + } +} + bool StreamingReaderMsgPtrComparator::operator()(const std::shared_ptr &a, const std::shared_ptr &b) { STREAMING_CHECK(a->meta); diff --git a/streaming/src/data_reader.h b/streaming/src/data_reader.h index 79793c41c..b1e94005b 100644 --- a/streaming/src/data_reader.h +++ b/streaming/src/data_reader.h @@ -98,11 +98,13 @@ class DataReader { /// Notify input queues to clear data whose seq id is equal or less than offset. /// It's used when checkpoint is done. - /// \param channel_info - /// \param offset - /// + /// \param channel_info consumer's channel info + /// \param offset consumed channel offset void NotifyConsumedItem(ConsumerChannelInfo &channel_info, uint64_t offset); + //// Notify message related channel to clear data. + void NotifyConsumed(std::shared_ptr &message); + private: /// Create channels and connect to all upstream. StreamingStatus InitChannel(); diff --git a/streaming/src/data_writer.cc b/streaming/src/data_writer.cc index 76a47c46f..ab929bf41 100644 --- a/streaming/src/data_writer.cc +++ b/streaming/src/data_writer.cc @@ -15,50 +15,6 @@ namespace ray { namespace streaming { -void DataWriter::WriterLoopForward() { - STREAMING_CHECK(RuntimeStatus::Running == runtime_context_->GetRuntimeStatus()); - while (true) { - int64_t min_passby_message_ts = std::numeric_limits::max(); - uint32_t empty_messge_send_count = 0; - - for (auto &output_queue : output_queue_ids_) { - if (RuntimeStatus::Running != runtime_context_->GetRuntimeStatus()) { - return; - } - ProducerChannelInfo &channel_info = channel_info_map_[output_queue]; - bool is_push_empty_message = false; - StreamingStatus write_status = - WriteChannelProcess(channel_info, &is_push_empty_message); - int64_t current_ts = current_time_ms(); - if (StreamingStatus::OK == write_status) { - channel_info.message_pass_by_ts = current_ts; - if (is_push_empty_message) { - min_passby_message_ts = - std::min(channel_info.message_pass_by_ts, min_passby_message_ts); - empty_messge_send_count++; - } - } else if (StreamingStatus::FullChannel == write_status) { - } else { - if (StreamingStatus::EmptyRingBuffer != write_status) { - STREAMING_LOG(DEBUG) << "write buffer status => " - << static_cast(write_status) - << ", is push empty message => " << is_push_empty_message; - } - } - } - - if (empty_messge_send_count == output_queue_ids_.size()) { - // Sleep if empty message was sent in all channel. - uint64_t sleep_time_ = current_time_ms() - min_passby_message_ts; - // Sleep_time can be bigger than time interval because of network jitter. - if (sleep_time_ <= runtime_context_->GetConfig().GetEmptyMessageTimeInterval()) { - std::this_thread::sleep_for(std::chrono::milliseconds( - runtime_context_->GetConfig().GetEmptyMessageTimeInterval() - sleep_time_)); - } - } - } -} - StreamingStatus DataWriter::WriteChannelProcess(ProducerChannelInfo &channel_info, bool *is_empty_message) { // No message in buffer, empty message will be sent to downstream queue. @@ -100,6 +56,8 @@ void DataWriter::Run() { // Enable empty message timer after writer running. empty_message_thread_ = std::make_shared(&DataWriter::EmptyMessageTimerCallback, this); + flow_control_thread_ = + std::make_shared(&DataWriter::FlowControlTimer, this); } /// Since every memory ring buffer's size is limited, when the writing buffer is @@ -192,6 +150,16 @@ StreamingStatus DataWriter::Init(const std::vector &queue_id_vec, return status; } } + + switch (runtime_context_->GetConfig().GetFlowControlType()) { + case proto::FlowControlType::UnconsumedSeqFlowControl: + flow_controller_ = std::make_shared( + channel_map_, runtime_context_->GetConfig().GetWriterConsumedStep()); + break; + default: + flow_controller_ = std::make_shared(); + break; + } // Register empty event and user event to event server. event_service_ = std::make_shared(); event_service_->Register( @@ -199,6 +167,8 @@ StreamingStatus DataWriter::Init(const std::vector &queue_id_vec, std::bind(&DataWriter::SendEmptyToChannel, this, std::placeholders::_1)); event_service_->Register(EventType::UserEvent, std::bind(&DataWriter::WriteAllToChannel, this, std::placeholders::_1)); + event_service_->Register(EventType::FlowEvent, std::bind(&DataWriter::WriteAllToChannel, + this, std::placeholders::_1)); runtime_context_->SetRuntimeStatus(RuntimeStatus::Running); return StreamingStatus::OK; @@ -219,6 +189,10 @@ DataWriter::~DataWriter() { STREAMING_LOG(INFO) << "Empty message thread waiting for join"; empty_message_thread_->join(); } + if (flow_control_thread_->joinable()) { + STREAMING_LOG(INFO) << "FlowControl timer thread waiting for join"; + flow_control_thread_->join(); + } int user_event_count = 0; int empty_event_count = 0; int flow_control_event_count = 0; @@ -358,6 +332,16 @@ bool DataWriter::WriteAllToChannel(ProducerChannelInfo *info) { if (RuntimeStatus::Running != runtime_context_->GetRuntimeStatus()) { return false; } + // Stop to write remained messages to channel if channel has been blocked by + // flow control. + if (channel_info.flow_control) { + break; + } + // Check this channel is blocked by flow control or not. + if (flow_controller_->ShouldFlowControl(channel_info)) { + channel_info.flow_control = true; + break; + } uint64_t ring_buffer_remain = channel_info.writer_ring_buffer->Size(); StreamingStatus write_status = WriteBufferToChannel(channel_info, ring_buffer_remain); int64_t current_ts = current_time_ms(); @@ -365,11 +349,11 @@ bool DataWriter::WriteAllToChannel(ProducerChannelInfo *info) { channel_info.message_pass_by_ts = current_ts; } else if (StreamingStatus::FullChannel == write_status || StreamingStatus::OutOfMemory == write_status) { + channel_info.flow_control = true; ++channel_info.queue_full_cnt; STREAMING_LOG(DEBUG) << "FullChannel after writing to channel, queue_full_cnt:" << channel_info.queue_full_cnt; - // TODO(lingxuan.zlx): we should notify consumed status to channel when - // flow control is supported. + RefreshChannelAndNotifyConsumed(channel_info); } else if (StreamingStatus::EmptyRingBuffer != write_status) { STREAMING_LOG(INFO) << channel_info.channel_id << ":something wrong when WriteToQueue " @@ -447,5 +431,54 @@ void DataWriter::EmptyMessageTimerCallback() { } } +void DataWriter::RefreshChannelAndNotifyConsumed(ProducerChannelInfo &channel_info) { + // Refresh current downstream consumed seq id. + channel_map_[channel_info.channel_id]->RefreshChannelInfo(); + // Notify the consumed information to local channel. + NotifyConsumedItem(channel_info, channel_info.queue_info.consumed_seq_id); +} + +void DataWriter::NotifyConsumedItem(ProducerChannelInfo &channel_info, uint32_t offset) { + if (offset > channel_info.current_seq_id) { + STREAMING_LOG(WARNING) << "Can not notify consumed this offset " << offset + << " that's out of range, max seq id " + << channel_info.current_seq_id; + } else { + channel_map_[channel_info.channel_id]->NotifyChannelConsumed(offset); + } +} + +void DataWriter::FlowControlTimer() { + std::chrono::milliseconds MockTimer( + runtime_context_->GetConfig().GetEventDrivenFlowControlInterval()); + while (true) { + if (runtime_context_->GetRuntimeStatus() != RuntimeStatus::Running) { + return; + } + for (const auto &output_queue : output_queue_ids_) { + if (runtime_context_->GetRuntimeStatus() != RuntimeStatus::Running) { + return; + } + ProducerChannelInfo &channel_info = channel_info_map_[output_queue]; + if (!channel_info.flow_control) { + continue; + } + if (!flow_controller_->ShouldFlowControl(channel_info)) { + channel_info.flow_control = false; + Event event{&channel_info, EventType::FlowEvent, + channel_info.writer_ring_buffer->IsFull()}; + event_service_->Push(event); + ++channel_info.flow_control_cnt; + } + } + std::this_thread::sleep_for(MockTimer); + } +} + +void DataWriter::GetOffsetInfo( + std::unordered_map *&offset_map) { + offset_map = &channel_info_map_; +} + } // namespace streaming } // namespace ray diff --git a/streaming/src/data_writer.h b/streaming/src/data_writer.h index e4191bca8..6682e4311 100644 --- a/streaming/src/data_writer.h +++ b/streaming/src/data_writer.h @@ -10,6 +10,7 @@ #include "channel.h" #include "config/streaming_config.h" #include "event_service.h" +#include "flow_control.h" #include "message/message_bundle.h" #include "runtime_context.h" @@ -30,6 +31,40 @@ namespace streaming { /// accordingly. It will sleep for a short interval to save cpu if all ring /// buffers have no data in that moment. class DataWriter { + public: + explicit DataWriter(std::shared_ptr &runtime_context); + virtual ~DataWriter(); + + /// Streaming writer client initialization. + /// \param queue_id_vec queue id vector + /// \param channel_message_id_vec channel seq id is related with message checkpoint + /// \param queue_size queue size (memory size not length) + StreamingStatus Init(const std::vector &channel_ids, + const std::vector &actor_ids, + const std::vector &channel_message_id_vec, + const std::vector &queue_size_vec); + + /// To increase throughout, we employed an output buffer for message transformation, + /// which means we merge a lot of message to a message bundle and no message will be + /// pushed into queue directly util daemon thread does this action. + /// Additionally, writing will block when buffer ring is full intentionly. + /// \param q_id, destination channel id + /// \param data, pointer of raw data + /// \param data_size, raw data size + /// \param message_type + /// \return message seq iq + uint64_t WriteMessageToBufferRing( + const ObjectID &q_id, uint8_t *data, uint32_t data_size, + StreamingMessageType message_type = StreamingMessageType::Message); + + void Run(); + + void Stop(); + + /// Get offset information about channels for checkpoint. + /// \param offset_map (return value) + void GetOffsetInfo(std::unordered_map *&offset_map); + private: bool IsMessageAvailableInBuffer(ProducerChannelInfo &channel_info); @@ -42,16 +77,6 @@ class DataWriter { StreamingStatus WriteBufferToChannel(ProducerChannelInfo &channel_info, uint64_t &buffer_remain); - /// Start the loop forward thread for collecting messages from all channels. - /// Invoking stack: - /// WriterLoopForward - /// -- WriteChannelProcess - /// -- WriteBufferToChannel - /// -- CollectFromRingBuffer - /// -- WriteTransientBufferToChannel - /// -- WriteEmptyMessage(if WriteChannelProcess return empty state) - void WriterLoopForward(); - /// Push empty message when no valid message or bundle was produced each time /// interval. /// \param channel_info @@ -79,42 +104,25 @@ class DataWriter { void EmptyMessageTimerCallback(); - public: - explicit DataWriter(std::shared_ptr &runtime_context); - virtual ~DataWriter(); + /// Notify channel consumed refreshing downstream queue stats. + void RefreshChannelAndNotifyConsumed(ProducerChannelInfo &channel_info); - /// Streaming writer client initialization. - /// \param queue_id_vec queue id vector - /// \param channel_message_id_vec channel seq id is related with message checkpoint - /// \param queue_size queue size (memory size not length) - StreamingStatus Init(const std::vector &channel_ids, - const std::vector &actor_ids, - const std::vector &channel_message_id_vec, - const std::vector &queue_size_vec); + /// Notify channel consumed by given offset. + void NotifyConsumedItem(ProducerChannelInfo &channel_info, uint32_t offset); - /// To increase throughout, we employed an output buffer for message transformation, - /// which means we merge a lot of message to a message bundle and no message will be - /// pushed into queue directly util daemon thread does this action. - /// Additionally, writing will block when buffer ring is full intentionly. - /// \param q_id - /// \param data - /// \param data_size - /// \param message_type - /// \return message seq iq - uint64_t WriteMessageToBufferRing( - const ObjectID &q_id, uint8_t *data, uint32_t data_size, - StreamingMessageType message_type = StreamingMessageType::Message); - - void Run(); - - void Stop(); + void FlowControlTimer(); private: std::shared_ptr event_service_; std::shared_ptr empty_message_thread_; + + std::shared_ptr flow_control_thread_; // One channel have unique identity. std::vector output_queue_ids_; + // Flow controller makes a decision when it's should be blocked and avoid + // unnecessary overflow. + std::shared_ptr flow_controller_; protected: std::unordered_map channel_info_map_; diff --git a/streaming/src/flow_control.cc b/streaming/src/flow_control.cc new file mode 100644 index 000000000..77cde8128 --- /dev/null +++ b/streaming/src/flow_control.cc @@ -0,0 +1,35 @@ +#include "flow_control.h" + +namespace ray { +namespace streaming { + +UnconsumedSeqFlowControl::UnconsumedSeqFlowControl( + std::unordered_map> &channel_map, + uint32_t step) + : channel_map_(channel_map), consumed_step_(step){}; + +bool UnconsumedSeqFlowControl::ShouldFlowControl(ProducerChannelInfo &channel_info) { + auto &queue_info = channel_info.queue_info; + if (queue_info.target_seq_id <= channel_info.current_seq_id) { + channel_map_[channel_info.channel_id]->RefreshChannelInfo(); + // Target seq id is maximum upper limit in current condition. + channel_info.queue_info.target_seq_id = + channel_info.queue_info.consumed_seq_id + consumed_step_; + STREAMING_LOG(DEBUG) << "Flow control stop writing to downstream, current max id => " + << channel_info.current_seq_id << ", target seq id => " + << queue_info.target_seq_id << ", consumed_id => " + << queue_info.consumed_seq_id << ", q id => " + << channel_info.channel_id + << ". if this log keeps printing, it means something wrong " + "with queue's info API, or downstream node is not " + "consuming data."; + // Double check after refreshing if target seq id is changed. + if (queue_info.target_seq_id <= channel_info.current_seq_id) { + return true; + } + } + return false; +} +} // namespace streaming + +} // namespace ray diff --git a/streaming/src/flow_control.h b/streaming/src/flow_control.h new file mode 100644 index 000000000..54c194492 --- /dev/null +++ b/streaming/src/flow_control.h @@ -0,0 +1,45 @@ +#ifndef RAY_STREAMING_FLOW_CONTROL_H +#define RAY_STREAMING_FLOW_CONTROL_H +#include "channel.h" + +namespace ray { +namespace streaming { +class ProducerTransfer; +/// We devise a flow control system in queue channel, and that's called flow +/// control by unconsumed seq. Upstream worker will detect consumer statistics via +/// api so it can keep fixed length messages in this process, which makes a +/// continuous datastream in channel or on the transporting way, then downstream +/// can read them from channel immediately. +/// To debug or compare with theses flow control methods, we also support +/// no-flow-control that will do nothing in transporting. +class FlowControl { + public: + virtual ~FlowControl() = default; + virtual bool ShouldFlowControl(ProducerChannelInfo &channel_info) = 0; +}; + +class NoFlowControl : public FlowControl { + public: + bool ShouldFlowControl(ProducerChannelInfo &channel_info) { return false; } + ~NoFlowControl() = default; +}; + +class UnconsumedSeqFlowControl : public FlowControl { + public: + UnconsumedSeqFlowControl( + std::unordered_map> &channel_map, + uint32_t step); + ~UnconsumedSeqFlowControl() = default; + bool ShouldFlowControl(ProducerChannelInfo &channel_info); + + private: + /// NOTE(wanxing.wwx) Reference to channel_map_ variable in DataWriter. + /// Flow-control is checked in FlowControlThread, so channel_map_ is accessed + /// in multithread situation. Especially, while rescaling, channel_map_ maybe + /// changed. But for now, FlowControlThread is stopped before rescaling. + std::unordered_map> &channel_map_; + uint32_t consumed_step_; +}; +} // namespace streaming +} // namespace ray +#endif // RAY_STREAMING_FLOW_CONTROL_H diff --git a/streaming/src/protobuf/streaming.proto b/streaming/src/protobuf/streaming.proto index 2b4a9a4cd..caf2cc865 100644 --- a/streaming/src/protobuf/streaming.proto +++ b/streaming/src/protobuf/streaming.proto @@ -11,6 +11,12 @@ enum OperatorType { SINK = 3; } +enum FlowControlType { + UNKNOWN_FLOW_CONTROL_TYPE = 0; + UnconsumedSeqFlowControl = 1; + NoFlowControl = 2; +} + // all string in this message is ASCII string message StreamingConfig { string job_name = 1; @@ -20,4 +26,8 @@ message StreamingConfig { OperatorType role = 5; uint32 ring_buffer_capacity = 6; uint32 empty_message_interval = 7; + FlowControlType flow_control_type = 8; + uint32 writer_consumed_step = 9; + uint32 reader_consumed_step = 10; + uint32 event_driven_flow_control_interval = 11; } diff --git a/streaming/src/test/mock_transfer_tests.cc b/streaming/src/test/mock_transfer_tests.cc index b0f133a90..f5cfcecd6 100644 --- a/streaming/src/test/mock_transfer_tests.cc +++ b/streaming/src/test/mock_transfer_tests.cc @@ -37,7 +37,7 @@ TEST(StreamingMockTransfer, mock_produce_consume) { class StreamingTransferTest : public ::testing::Test { public: StreamingTransferTest() { - std::shared_ptr runtime_context(new RuntimeContext()); + runtime_context = std::make_shared(); runtime_context->MarkMockTest(); writer = std::make_shared(runtime_context); reader = std::make_shared(runtime_context); @@ -64,6 +64,7 @@ class StreamingTransferTest : public ::testing::Test { std::shared_ptr writer; std::shared_ptr reader; std::vector queue_vec; + std::shared_ptr runtime_context; }; TEST_F(StreamingTransferTest, exchange_single_channel_test) { @@ -130,6 +131,58 @@ TEST_F(StreamingTransferTest, exchange_consumed_test) { write_thread.join(); } +TEST_F(StreamingTransferTest, flow_control_test) { + InitTransfer(); + writer->Run(); + uint32_t data_size = 8196; + std::shared_ptr data(new uint8_t[data_size]); + auto func = [data, data_size](int index) { std::fill_n(data.get(), data_size, index); }; + + size_t num = 10000; + std::thread write_thread([this, data, data_size, &func, num]() { + for (size_t i = 0; i < num; ++i) { + func(i); + writer->WriteMessageToBufferRing(queue_vec[0], data.get(), data_size); + } + }); + std::unordered_map *writer_offset_info = nullptr; + std::unordered_map *reader_offset_info = nullptr; + writer->GetOffsetInfo(writer_offset_info); + reader->GetOffsetInfo(reader_offset_info); + uint32_t writer_step = runtime_context->GetConfig().GetWriterConsumedStep(); + uint32_t reader_step = runtime_context->GetConfig().GetReaderConsumedStep(); + uint64_t &writer_current_seq_id = (*writer_offset_info)[queue_vec[0]].current_seq_id; + uint64_t &writer_current_message_id = + (*writer_offset_info)[queue_vec[0]].current_message_id; + uint64_t &reader_target_seq_id = + (*reader_offset_info)[queue_vec[0]].queue_info.target_seq_id; + while (writer_current_seq_id < writer_step) { + STREAMING_LOG(INFO) << "Writer currrent seq id " << writer_current_seq_id + << " message " << writer_current_message_id << " consumer step " + << writer_step; + std::this_thread::sleep_for( + std::chrono::milliseconds(StreamingConfig::TIME_WAIT_UINT)); + } + + std::list read_message_list; + while (read_message_list.size() < num) { + std::shared_ptr msg; + reader->GetBundle(5000, msg); + StreamingMessageBundlePtr bundle_ptr = StreamingMessageBundle::FromBytes(msg->data); + auto &message_list = bundle_ptr->GetMessageList(); + std::copy(message_list.begin(), message_list.end(), + std::back_inserter(read_message_list)); + ASSERT_GE(writer_step, writer_current_seq_id - msg->seq_id); + ASSERT_GE(msg->seq_id + reader_step, reader_target_seq_id); + } + int index = 0; + for (auto &message : read_message_list) { + func(index++); + EXPECT_EQ(std::memcmp(message->RawData(), data.get(), data_size), 0); + } + write_thread.join(); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();