diff --git a/streaming/src/channel.cc b/streaming/src/channel.cc index 974d42e37..421517359 100644 --- a/streaming/src/channel.cc +++ b/streaming/src/channel.cc @@ -5,11 +5,11 @@ namespace streaming { ProducerChannel::ProducerChannel(std::shared_ptr &transfer_config, ProducerChannelInfo &p_channel_info) - : transfer_config_(transfer_config), channel_info(p_channel_info) {} + : transfer_config_(transfer_config), channel_info_(p_channel_info) {} ConsumerChannel::ConsumerChannel(std::shared_ptr &transfer_config, ConsumerChannelInfo &c_channel_info) - : transfer_config_(transfer_config), channel_info(c_channel_info) {} + : transfer_config_(transfer_config), channel_info_(c_channel_info) {} StreamingQueueProducer::StreamingQueueProducer(std::shared_ptr &transfer_config, ProducerChannelInfo &p_channel_info) @@ -28,49 +28,49 @@ StreamingStatus StreamingQueueProducer::CreateTransferChannel() { uint64_t last_message_id_in_queue = 0; if (!last_message_id_in_queue) { - if (last_message_id_in_queue < channel_info.current_message_id) { + if (last_message_id_in_queue < channel_info_.current_message_id) { STREAMING_LOG(WARNING) << "last message id in queue : " << last_message_id_in_queue << " is less than message checkpoint loaded id : " - << channel_info.current_message_id - << ", an old queue object " << channel_info.channel_id + << channel_info_.current_message_id + << ", an old queue object " << channel_info_.channel_id << " was fond in store"; } - last_message_id_in_queue = channel_info.current_message_id; + last_message_id_in_queue = channel_info_.current_message_id; } if (queue_last_seq_id == static_cast(-1)) { queue_last_seq_id = 0; } - channel_info.current_seq_id = queue_last_seq_id; + channel_info_.current_seq_id = queue_last_seq_id; STREAMING_LOG(WARNING) << "existing last message id => " << last_message_id_in_queue << ", message id in channel => " - << channel_info.current_message_id << ", queue last seq id => " + << channel_info_.current_message_id << ", queue last seq id => " << queue_last_seq_id; - channel_info.message_last_commit_id = last_message_id_in_queue; + channel_info_.message_last_commit_id = last_message_id_in_queue; return StreamingStatus::OK; } StreamingStatus StreamingQueueProducer::CreateQueue() { - STREAMING_LOG(INFO) << "CreateQueue qid: " << channel_info.channel_id - << " data_size: " << channel_info.queue_size; + STREAMING_LOG(INFO) << "CreateQueue qid: " << channel_info_.channel_id + << " data_size: " << channel_info_.queue_size; auto upstream_handler = ray::streaming::UpstreamQueueMessageHandler::GetService(); - if (upstream_handler->UpstreamQueueExists(channel_info.channel_id)) { + if (upstream_handler->UpstreamQueueExists(channel_info_.channel_id)) { RAY_LOG(INFO) << "StreamingQueueWriter::CreateQueue duplicate!!!"; return StreamingStatus::OK; } - upstream_handler->SetPeerActorID(channel_info.channel_id, channel_info.actor_id); + upstream_handler->SetPeerActorID(channel_info_.channel_id, channel_info_.actor_id); queue_ = upstream_handler->CreateUpstreamQueue( - channel_info.channel_id, channel_info.actor_id, channel_info.queue_size); + channel_info_.channel_id, channel_info_.actor_id, channel_info_.queue_size); STREAMING_CHECK(queue_ != nullptr); std::vector queue_ids, failed_queues; - queue_ids.push_back(channel_info.channel_id); + queue_ids.push_back(channel_info_.channel_id); upstream_handler->WaitQueues(queue_ids, 10 * 1000, failed_queues); - STREAMING_LOG(INFO) << "q id => " << channel_info.channel_id << ", queue size => " - << channel_info.queue_size; + STREAMING_LOG(INFO) << "q id => " << channel_info_.channel_id << ", queue size => " + << channel_info_.queue_size; return StreamingStatus::OK; } @@ -84,6 +84,11 @@ StreamingStatus StreamingQueueProducer::ClearTransferCheckpoint( return StreamingStatus::OK; } +StreamingStatus StreamingQueueProducer::RefreshChannelInfo() { + channel_info_.queue_info.consumed_seq_id = queue_->GetMinConsumedSeqID(); + return StreamingStatus::OK; +} + StreamingStatus StreamingQueueProducer::NotifyChannelConsumed(uint64_t channel_offset) { queue_->SetQueueEvictionLimit(channel_offset); return StreamingStatus::OK; @@ -92,10 +97,10 @@ StreamingStatus StreamingQueueProducer::NotifyChannelConsumed(uint64_t channel_o StreamingStatus StreamingQueueProducer::ProduceItemToChannel(uint8_t *data, uint32_t data_size) { Status status = - PushQueueItem(channel_info.current_seq_id + 1, data, data_size, current_time_ms()); + PushQueueItem(channel_info_.current_seq_id + 1, data, data_size, current_time_ms()); if (status.code() != StatusCode::OK) { - STREAMING_LOG(DEBUG) << channel_info.channel_id << " => Queue is full" + STREAMING_LOG(DEBUG) << channel_info_.channel_id << " => Queue is full" << " meesage => " << status.message(); // Assume that only status OutOfMemory and OK are acceptable. @@ -113,7 +118,7 @@ StreamingStatus StreamingQueueProducer::ProduceItemToChannel(uint8_t *data, Status StreamingQueueProducer::PushQueueItem(uint64_t seq_id, uint8_t *data, uint32_t data_size, uint64_t timestamp) { STREAMING_LOG(INFO) << "StreamingQueueProducer::PushQueueItem:" - << " qid: " << channel_info.channel_id << " seq_id: " << seq_id + << " qid: " << channel_info_.channel_id << " seq_id: " << seq_id << " data_size: " << data_size; Status status = queue_->Push(seq_id, data, data_size, timestamp, false); if (status.IsOutOfMemory()) { @@ -142,18 +147,18 @@ StreamingQueueConsumer::~StreamingQueueConsumer() { StreamingStatus StreamingQueueConsumer::CreateTransferChannel() { auto downstream_handler = ray::streaming::DownstreamQueueMessageHandler::GetService(); - STREAMING_LOG(INFO) << "GetQueue qid: " << channel_info.channel_id - << " start_seq_id: " << channel_info.current_seq_id + 1; - if (downstream_handler->DownstreamQueueExists(channel_info.channel_id)) { + STREAMING_LOG(INFO) << "GetQueue qid: " << channel_info_.channel_id + << " start_seq_id: " << channel_info_.current_seq_id + 1; + if (downstream_handler->DownstreamQueueExists(channel_info_.channel_id)) { RAY_LOG(INFO) << "StreamingQueueReader::GetQueue duplicate!!!"; return StreamingStatus::OK; } - downstream_handler->SetPeerActorID(channel_info.channel_id, channel_info.actor_id); - STREAMING_LOG(INFO) << "Create ReaderQueue " << channel_info.channel_id - << " pull from start_seq_id: " << channel_info.current_seq_id + 1; - queue_ = downstream_handler->CreateDownstreamQueue(channel_info.channel_id, - channel_info.actor_id); + downstream_handler->SetPeerActorID(channel_info_.channel_id, channel_info_.actor_id); + STREAMING_LOG(INFO) << "Create ReaderQueue " << channel_info_.channel_id + << " pull from start_seq_id: " << channel_info_.current_seq_id + 1; + queue_ = downstream_handler->CreateDownstreamQueue(channel_info_.channel_id, + channel_info_.actor_id); return StreamingStatus::OK; } @@ -167,11 +172,16 @@ StreamingStatus StreamingQueueConsumer::ClearTransferCheckpoint( return StreamingStatus::OK; } +StreamingStatus StreamingQueueConsumer::RefreshChannelInfo() { + channel_info_.queue_info.last_seq_id = queue_->GetLastRecvSeqId(); + return StreamingStatus::OK; +} + StreamingStatus StreamingQueueConsumer::ConsumeItemFromChannel(uint64_t &offset_id, uint8_t *&data, uint32_t &data_size, uint32_t timeout) { - STREAMING_LOG(INFO) << "GetQueueItem qid: " << channel_info.channel_id; + STREAMING_LOG(INFO) << "GetQueueItem qid: " << channel_info_.channel_id; STREAMING_CHECK(queue_ != nullptr); QueueItem item = queue_->PopPendingBlockTimeout(timeout * 1000); if (item.SeqId() == QUEUE_INVALID_SEQ_ID) { @@ -186,7 +196,7 @@ StreamingStatus StreamingQueueConsumer::ConsumeItemFromChannel(uint64_t &offset_ offset_id = item.SeqId(); data_size = item.Buffer()->Size(); - STREAMING_LOG(DEBUG) << "GetQueueItem qid: " << channel_info.channel_id + STREAMING_LOG(DEBUG) << "GetQueueItem qid: " << channel_info_.channel_id << " seq_id: " << offset_id << " msg_id: " << item.MaxMsgId() << " data_size: " << data_size; return StreamingStatus::OK; @@ -222,9 +232,9 @@ std::mutex MockQueue::mutex; StreamingStatus MockProducer::CreateTransferChannel() { std::unique_lock lock(MockQueue::mutex); MockQueue &mock_queue = MockQueue::GetMockQueue(); - mock_queue.message_buffer_[channel_info.channel_id] = + mock_queue.message_buffer_[channel_info_.channel_id] = std::make_shared>(500); - mock_queue.consumed_buffer_[channel_info.channel_id] = + mock_queue.consumed_buffer_[channel_info_.channel_id] = std::make_shared>(500); return StreamingStatus::OK; } @@ -232,20 +242,20 @@ StreamingStatus MockProducer::CreateTransferChannel() { StreamingStatus MockProducer::DestroyTransferChannel() { std::unique_lock lock(MockQueue::mutex); MockQueue &mock_queue = MockQueue::GetMockQueue(); - mock_queue.message_buffer_.erase(channel_info.channel_id); - mock_queue.consumed_buffer_.erase(channel_info.channel_id); + mock_queue.message_buffer_.erase(channel_info_.channel_id); + mock_queue.consumed_buffer_.erase(channel_info_.channel_id); return StreamingStatus::OK; } StreamingStatus MockProducer::ProduceItemToChannel(uint8_t *data, uint32_t data_size) { std::unique_lock lock(MockQueue::mutex); MockQueue &mock_queue = MockQueue::GetMockQueue(); - auto &ring_buffer = mock_queue.message_buffer_[channel_info.channel_id]; + auto &ring_buffer = mock_queue.message_buffer_[channel_info_.channel_id]; if (ring_buffer->Full()) { return StreamingStatus::OutOfMemory; } MockQueueItem item; - item.seq_id = channel_info.current_seq_id + 1; + item.seq_id = channel_info_.current_seq_id + 1; item.data.reset(new uint8_t[data_size]); item.data_size = data_size; std::memcpy(item.data.get(), data, data_size); @@ -258,7 +268,7 @@ StreamingStatus MockConsumer::ConsumeItemFromChannel(uint64_t &offset_id, uint8_ uint32_t timeout) { std::unique_lock lock(MockQueue::mutex); MockQueue &mock_queue = MockQueue::GetMockQueue(); - auto &channel_id = channel_info.channel_id; + auto &channel_id = channel_info_.channel_id; if (mock_queue.message_buffer_.find(channel_id) == mock_queue.message_buffer_.end()) { return StreamingStatus::NoSuchItem; } @@ -278,7 +288,7 @@ StreamingStatus MockConsumer::ConsumeItemFromChannel(uint64_t &offset_id, uint8_ StreamingStatus MockConsumer::NotifyChannelConsumed(uint64_t offset_id) { std::unique_lock lock(MockQueue::mutex); MockQueue &mock_queue = MockQueue::GetMockQueue(); - auto &channel_id = channel_info.channel_id; + auto &channel_id = channel_info_.channel_id; auto &ring_buffer = mock_queue.consumed_buffer_[channel_id]; while (!ring_buffer->Empty() && ring_buffer->Front().seq_id <= offset_id) { ring_buffer->Pop(); diff --git a/streaming/src/channel.h b/streaming/src/channel.h index ce17ebb7e..1e367b52e 100644 --- a/streaming/src/channel.h +++ b/streaming/src/channel.h @@ -77,12 +77,13 @@ class ProducerChannel { virtual StreamingStatus DestroyTransferChannel() = 0; virtual StreamingStatus ClearTransferCheckpoint(uint64_t checkpoint_id, uint64_t checkpoint_offset) = 0; + virtual StreamingStatus RefreshChannelInfo() = 0; virtual StreamingStatus ProduceItemToChannel(uint8_t *data, uint32_t data_size) = 0; virtual StreamingStatus NotifyChannelConsumed(uint64_t channel_offset) = 0; protected: std::shared_ptr transfer_config_; - ProducerChannelInfo &channel_info; + ProducerChannelInfo &channel_info_; }; class ConsumerChannel { @@ -94,6 +95,7 @@ class ConsumerChannel { virtual StreamingStatus DestroyTransferChannel() = 0; virtual StreamingStatus ClearTransferCheckpoint(uint64_t checkpoint_id, uint64_t checkpoint_offset) = 0; + virtual StreamingStatus RefreshChannelInfo() = 0; virtual StreamingStatus ConsumeItemFromChannel(uint64_t &offset_id, uint8_t *&data, uint32_t &data_size, uint32_t timeout) = 0; @@ -101,7 +103,7 @@ class ConsumerChannel { protected: std::shared_ptr transfer_config_; - ConsumerChannelInfo &channel_info; + ConsumerChannelInfo &channel_info_; }; class StreamingQueueProducer : public ProducerChannel { @@ -113,6 +115,7 @@ class StreamingQueueProducer : public ProducerChannel { StreamingStatus DestroyTransferChannel() override; StreamingStatus ClearTransferCheckpoint(uint64_t checkpoint_id, uint64_t checkpoint_offset) override; + StreamingStatus RefreshChannelInfo() override; StreamingStatus ProduceItemToChannel(uint8_t *data, uint32_t data_size) override; StreamingStatus NotifyChannelConsumed(uint64_t offset_id) override; @@ -134,6 +137,7 @@ class StreamingQueueConsumer : public ConsumerChannel { StreamingStatus DestroyTransferChannel() override; StreamingStatus ClearTransferCheckpoint(uint64_t checkpoint_id, uint64_t checkpoint_offset) override; + StreamingStatus RefreshChannelInfo() override; StreamingStatus ConsumeItemFromChannel(uint64_t &offset_id, uint8_t *&data, uint32_t &data_size, uint32_t timeout) override; StreamingStatus NotifyChannelConsumed(uint64_t offset_id) override; @@ -158,6 +162,8 @@ class MockProducer : public ProducerChannel { return StreamingStatus::OK; } + StreamingStatus RefreshChannelInfo() override { return StreamingStatus::OK; } + StreamingStatus ProduceItemToChannel(uint8_t *data, uint32_t data_size) override; StreamingStatus NotifyChannelConsumed(uint64_t channel_offset) override { @@ -176,6 +182,7 @@ class MockConsumer : public ConsumerChannel { uint64_t checkpoint_offset) override { return StreamingStatus::OK; } + StreamingStatus RefreshChannelInfo() override { return StreamingStatus::OK; } StreamingStatus ConsumeItemFromChannel(uint64_t &offset_id, uint8_t *&data, uint32_t &data_size, uint32_t timeout) override; StreamingStatus NotifyChannelConsumed(uint64_t offset_id) override;