From b14c56e599ea34dd88838c820151063b1aeba0e8 Mon Sep 17 00:00:00 2001 From: Ian Rodney Date: Tue, 25 Aug 2020 10:07:20 -0700 Subject: [PATCH] fix lint (#10315) --- streaming/src/channel.h | 4 +- streaming/src/queue/message.cc | 17 ++--- streaming/src/queue/message.h | 10 ++- streaming/src/queue/queue.cc | 22 +++--- streaming/src/queue/queue.h | 38 ++++----- streaming/src/queue/queue_handler.cc | 22 +++--- streaming/src/queue/queue_handler.h | 18 ++--- streaming/src/queue/queue_item.h | 17 ++++- streaming/src/test/mock_actor.cc | 85 +++++++++++++-------- streaming/src/test/queue_protobuf_tests.cc | 5 +- streaming/src/test/streaming_queue_tests.cc | 3 +- 11 files changed, 141 insertions(+), 100 deletions(-) diff --git a/streaming/src/channel.h b/streaming/src/channel.h index ee52c2a74..4bb46aa15 100644 --- a/streaming/src/channel.h +++ b/streaming/src/channel.h @@ -195,7 +195,9 @@ class MockConsumer : public ConsumerChannel { explicit MockConsumer(std::shared_ptr &transfer_config, ConsumerChannelInfo &c_channel_info) : ConsumerChannel(transfer_config, c_channel_info){}; - TransferCreationStatus CreateTransferChannel() override { return TransferCreationStatus::PullOk; } + TransferCreationStatus CreateTransferChannel() override { + return TransferCreationStatus::PullOk; + } StreamingStatus DestroyTransferChannel() override { return StreamingStatus::OK; } StreamingStatus ClearTransferCheckpoint(uint64_t checkpoint_id, uint64_t checkpoint_offset) override { diff --git a/streaming/src/queue/message.cc b/streaming/src/queue/message.cc index f7d4d7874..ff1edad2a 100644 --- a/streaming/src/queue/message.cc +++ b/streaming/src/queue/message.cc @@ -12,8 +12,7 @@ std::unique_ptr Message::ToBytes() { int64_t fbs_length = pboutput.length(); queue::protobuf::StreamingQueueMessageType type = Type(); - size_t total_len = - kItemHeaderSize + fbs_length; + size_t total_len = kItemHeaderSize + fbs_length; if (buffer_ != nullptr) { total_len += buffer_->Size(); } @@ -45,7 +44,6 @@ std::unique_ptr Message::ToBytes() { return buffer; } - void Message::FillMessageCommon(queue::protobuf::MessageCommon *common) { common->set_src_actor_id(actor_id_.Binary()); common->set_dst_actor_id(peer_actor_id_.Binary()); @@ -82,8 +80,9 @@ std::shared_ptr DataMessage::FromBytes(uint8_t *bytes) { /// Copy data and create a new buffer for streaming queue. std::shared_ptr buffer = std::make_shared(bytes, (size_t)length, true); - std::shared_ptr data_msg = std::make_shared( - src_actor_id, dst_actor_id, queue_id, seq_id, msg_id_start, msg_id_end, buffer, raw); + std::shared_ptr data_msg = + std::make_shared(src_actor_id, dst_actor_id, queue_id, seq_id, + msg_id_start, msg_id_end, buffer, raw); return data_msg; } @@ -209,10 +208,10 @@ std::shared_ptr PullResponseMessage::FromBytes(uint8_t *byt bool is_upstream_first_pull = message.is_upstream_first_pull(); STREAMING_LOG(INFO) << "src_actor_id:" << src_actor_id - << " dst_actor_id:" << dst_actor_id << " queue_id:" << queue_id - << " seq_id: " << seq_id << " msg_id: " << msg_id << " err_code:" - << queue::protobuf::StreamingQueueError_Name(err_code) - << " is_upstream_first_pull: " << is_upstream_first_pull; + << " dst_actor_id:" << dst_actor_id << " queue_id:" << queue_id + << " seq_id: " << seq_id << " msg_id: " << msg_id << " err_code:" + << queue::protobuf::StreamingQueueError_Name(err_code) + << " is_upstream_first_pull: " << is_upstream_first_pull; std::shared_ptr pull_rsp_msg = std::make_shared(src_actor_id, dst_actor_id, queue_id, seq_id, diff --git a/streaming/src/queue/message.h b/streaming/src/queue/message.h index 50c80a9fa..42b474af2 100644 --- a/streaming/src/queue/message.h +++ b/streaming/src/queue/message.h @@ -45,6 +45,7 @@ class Message { virtual void ToProtobuf(std::string *output) = 0; void FillMessageCommon(queue::protobuf::MessageCommon *common); + protected: ActorID actor_id_; ActorID peer_actor_id_; @@ -68,10 +69,13 @@ constexpr uint32_t kItemHeaderSize = kItemMetaHeaderSize + sizeof(uint64_t); class DataMessage : public Message { public: DataMessage(const ActorID &actor_id, const ActorID &peer_actor_id, ObjectID queue_id, - uint64_t seq_id, uint64_t msg_id_start, uint64_t msg_id_end, std::shared_ptr buffer, bool raw) - : Message(actor_id, peer_actor_id, queue_id, buffer), seq_id_(seq_id), + uint64_t seq_id, uint64_t msg_id_start, uint64_t msg_id_end, + std::shared_ptr buffer, bool raw) + : Message(actor_id, peer_actor_id, queue_id, buffer), + seq_id_(seq_id), msg_id_start_(msg_id_start), - msg_id_end_(msg_id_end),raw_(raw) {} + msg_id_end_(msg_id_end), + raw_(raw) {} virtual ~DataMessage() {} static std::shared_ptr FromBytes(uint8_t *bytes); diff --git a/streaming/src/queue/queue.cc b/streaming/src/queue/queue.cc index 308ac6b76..434420ca3 100644 --- a/streaming/src/queue/queue.cc +++ b/streaming/src/queue/queue.cc @@ -102,7 +102,8 @@ size_t Queue::PendingCount() { } Status WriterQueue::Push(uint64_t seq_id, uint8_t *buffer, uint32_t buffer_size, - uint64_t timestamp, uint64_t msg_id_start, uint64_t msg_id_end, bool raw) { + uint64_t timestamp, uint64_t msg_id_start, uint64_t msg_id_end, + bool raw) { if (IsPendingFull(buffer_size)) { return Status::OutOfMemory("Queue Push OutOfMemory"); } @@ -122,8 +123,8 @@ Status WriterQueue::Push(uint64_t seq_id, uint8_t *buffer, uint32_t buffer_size, void WriterQueue::Send() { while (!IsPendingEmpty()) { QueueItem item = PopPending(); - DataMessage msg(actor_id_, peer_actor_id_, queue_id_, item.SeqId(), item.MsgIdStart(), item.MsgIdEnd(), item.Buffer(), - item.IsRaw()); + DataMessage msg(actor_id_, peer_actor_id_, queue_id_, item.SeqId(), item.MsgIdStart(), + item.MsgIdEnd(), item.Buffer(), item.IsRaw()); std::unique_ptr buffer = msg.ToBytes(); STREAMING_CHECK(transport_ != nullptr); transport_->Send(std::move(buffer)); @@ -191,8 +192,10 @@ int WriterQueue::ResendItems(std::list::iterator start_iter, } void WriterQueue::FindItem( - uint64_t target_msg_id, std::function greater_callback, std::function less_callback, - std::function::iterator, uint64_t, uint64_t)> equal_callback) { + uint64_t target_msg_id, std::function greater_callback, + std::function less_callback, + std::function::iterator, uint64_t, uint64_t)> + equal_callback) { auto last_one = std::prev(watershed_iter_); bool last_item_too_small = last_one != buffer_queue_.end() && last_one->MsgIdEnd() < target_msg_id; @@ -243,10 +246,11 @@ void WriterQueue::OnPull( /// target_msg_id is too small. [this, &pull_msg, &callback]() { STREAMING_LOG(WARNING) << "Data lost."; - PullResponseMessage msg( - pull_msg->PeerActorId(), pull_msg->ActorId(), pull_msg->QueueId(), - QUEUE_INVALID_SEQ_ID, QUEUE_INVALID_SEQ_ID, - queue::protobuf::StreamingQueueError::DATA_LOST, is_upstream_first_pull_); + PullResponseMessage msg(pull_msg->PeerActorId(), pull_msg->ActorId(), + pull_msg->QueueId(), QUEUE_INVALID_SEQ_ID, + QUEUE_INVALID_SEQ_ID, + queue::protobuf::StreamingQueueError::DATA_LOST, + is_upstream_first_pull_); std::unique_ptr buffer = msg.ToBytes(); callback(std::move(buffer)); diff --git a/streaming/src/queue/queue.h b/streaming/src/queue/queue.h index 32c1355b5..b15c3cfe8 100644 --- a/streaming/src/queue/queue.h +++ b/streaming/src/queue/queue.h @@ -31,14 +31,18 @@ class Queue { /// \param actor_id, the actor id of upstream worker /// \param peer_actor_id, the actor id of downstream worker /// \param queue_id the unique identification of a pair of queues (upstream and - /// downstream). - /// \param size max size of the queue in bytes. + /// downstream). + /// \param size max size of the queue in bytes. /// \param transport /// transport to send items to peer. - Queue(const ActorID &actor_id, const ActorID &peer_actor_id, ObjectID queue_id, uint64_t size, std::shared_ptr transport) + Queue(const ActorID &actor_id, const ActorID &peer_actor_id, ObjectID queue_id, + uint64_t size, std::shared_ptr transport) : actor_id_(actor_id), peer_actor_id_(peer_actor_id), - queue_id_(queue_id), max_data_size_(size), data_size_(0), data_size_sent_(0) { + queue_id_(queue_id), + max_data_size_(size), + data_size_(0), + data_size_sent_(0) { buffer_queue_.push_back(InvalidQueueItem()); watershed_iter_ = buffer_queue_.begin(); } @@ -98,6 +102,7 @@ class Queue { inline ActorID GetActorID() { return actor_id_; } inline ActorID GetPeerActorID() { return peer_actor_id_; } inline ObjectID GetQueueID() { return queue_id_; } + protected: std::list buffer_queue_; std::list::iterator watershed_iter_; @@ -137,13 +142,12 @@ class WriterQueue : public Queue { is_resending_(false), is_upstream_first_pull_(true) {} - /// Push a continuous buffer into queue, the buffer consists of some messages packed by DataWriter. - /// \param data, the buffer address - /// \param data_size, buffer size - /// \param timestamp, the timestamp when the buffer pushed in - /// \param msg_id_start, the message id of the first message in the buffer - /// \param msg_id_end, the message id of the last message in the buffer - /// \param raw, whether this buffer is raw data, be True only in test + /// Push a continuous buffer into queue, the buffer consists of some messages packed by + /// DataWriter. \param data, the buffer address \param data_size, buffer size \param + /// timestamp, the timestamp when the buffer pushed in \param msg_id_start, the message + /// id of the first message in the buffer \param msg_id_end, the message id of the last + /// message in the buffer \param raw, whether this buffer is raw data, be True only in + /// test Status Push(uint64_t seq_id, uint8_t *buffer, uint32_t buffer_size, uint64_t timestamp, uint64_t msg_id_start, uint64_t msg_id_end, bool raw = false); @@ -201,11 +205,10 @@ class WriterQueue : public Queue { /// in the queue, the `less_callback` callback will be called; If the `target_msg_id` is /// found in the queue, the `found_callback` callback willbe called. /// \param target_msg_id, the target message id to be found. - void FindItem( - uint64_t target_msg_id, - std::function greater_callback, - std::function less_callback, - std::function::iterator, uint64_t, uint64_t)> equal_callback); + void FindItem(uint64_t target_msg_id, std::function greater_callback, + std::function less_callback, + std::function::iterator, uint64_t, uint64_t)> + equal_callback); private: ActorID actor_id_; @@ -231,7 +234,8 @@ class ReaderQueue : public Queue { /// NOTE: we do not restrict queue size of ReaderQueue ReaderQueue(const ObjectID &queue_id, const ActorID &actor_id, const ActorID &peer_actor_id, std::shared_ptr transport) - : Queue(actor_id, peer_actor_id, queue_id, std::numeric_limits::max(), transport), + : Queue(actor_id, peer_actor_id, queue_id, std::numeric_limits::max(), + transport), actor_id_(actor_id), peer_actor_id_(peer_actor_id), min_consumed_id_(QUEUE_INVALID_SEQ_ID), diff --git a/streaming/src/queue/queue_handler.cc b/streaming/src/queue/queue_handler.cc index 52cf48d78..40a6033d4 100644 --- a/streaming/src/queue/queue_handler.cc +++ b/streaming/src/queue/queue_handler.cc @@ -238,8 +238,8 @@ void UpstreamQueueMessageHandler::DispatchMessageInternal( } else if (msg->Type() == queue::protobuf::StreamingQueueMessageType::StreamingQueueCheckRspMsgType) { STREAMING_CHECK(false) << "Should not receive StreamingQueueCheckRspMsg"; - } else if (msg->Type() == - queue::protobuf::StreamingQueueMessageType::StreamingQueuePullRequestMsgType) { + } else if (msg->Type() == queue::protobuf::StreamingQueueMessageType:: + StreamingQueuePullRequestMsgType) { STREAMING_CHECK(callback) << "StreamingQueuePullRequestMsg " << " qid: " << msg->QueueId() << " actorid " << msg->ActorId() @@ -330,8 +330,7 @@ std::shared_ptr DownstreamQueueMessageHandler::CreateDownstreamQueu StreamingQueueStatus DownstreamQueueMessageHandler::PullQueue( const ObjectID &queue_id, uint64_t start_msg_id, bool &is_upstream_first_pull, uint64_t timeout_ms) { - STREAMING_LOG(INFO) << "PullQueue queue_id: " - << queue_id + STREAMING_LOG(INFO) << "PullQueue queue_id: " << queue_id << " start_msg_id: " << start_msg_id << " is_upstream_first_pull: " << is_upstream_first_pull; uint64_t start_time = current_time_ms(); @@ -397,8 +396,8 @@ void DownstreamQueueMessageHandler::DispatchMessageInternal( if (callback != nullptr) { callback(check_result); } - } else if (msg->Type() == - queue::protobuf::StreamingQueueMessageType::StreamingQueueResendDataMsgType) { + } else if (msg->Type() == queue::protobuf::StreamingQueueMessageType:: + StreamingQueueResendDataMsgType) { auto queue = downstream_queues_.find(msg->QueueId()); if (queue == downstream_queues_.end()) { std::shared_ptr data_msg = @@ -456,14 +455,14 @@ StreamingQueueStatus DownstreamQueueMessageHandler::PullPeerAsync( } std::shared_ptr result_msg = ParseMessage(result_buffer); - STREAMING_CHECK(result_msg->Type() == - queue::protobuf::StreamingQueueMessageType::StreamingQueuePullResponseMsgType); + STREAMING_CHECK( + result_msg->Type() == + queue::protobuf::StreamingQueueMessageType::StreamingQueuePullResponseMsgType); std::shared_ptr response_msg = std::dynamic_pointer_cast(result_msg); STREAMING_LOG(INFO) << "PullPeerAsync error: " - << queue::protobuf::StreamingQueueError_Name( - response_msg->Error()) + << queue::protobuf::StreamingQueueError_Name(response_msg->Error()) << " start_msg_id: " << start_msg_id; is_upstream_first_pull = response_msg->IsUpstreamFirstPull(); @@ -471,8 +470,7 @@ StreamingQueueStatus DownstreamQueueMessageHandler::PullPeerAsync( STREAMING_LOG(INFO) << "Set queue " << queue_id << " expect_seq_id to " << response_msg->SeqId(); return StreamingQueueStatus::OK; - } else if (response_msg->Error() == - queue::protobuf::StreamingQueueError::DATA_LOST) { + } else if (response_msg->Error() == queue::protobuf::StreamingQueueError::DATA_LOST) { return StreamingQueueStatus::DataLost; } else if (response_msg->Error() == queue::protobuf::StreamingQueueError::NO_VALID_DATA) { diff --git a/streaming/src/queue/queue_handler.h b/streaming/src/queue/queue_handler.h index b1d68ba18..bbb68eea8 100644 --- a/streaming/src/queue/queue_handler.h +++ b/streaming/src/queue/queue_handler.h @@ -40,8 +40,7 @@ class QueueMessageHandler { /// Construct a QueueMessageHandler instance. /// \param[in] actor_id actor id of current actor. QueueMessageHandler(const ActorID &actor_id) - : actor_id_(actor_id), queue_dummy_work_(queue_service_) { - } + : actor_id_(actor_id), queue_dummy_work_(queue_service_) {} virtual ~QueueMessageHandler() { Stop(); } @@ -116,8 +115,8 @@ class QueueMessageHandler { class UpstreamQueueMessageHandler : public QueueMessageHandler { public: /// Construct a UpstreamQueueMessageHandler instance. - UpstreamQueueMessageHandler(const ActorID &actor_id) : QueueMessageHandler(actor_id), - handler_service_dummy_worker_(handler_service_) { + UpstreamQueueMessageHandler(const ActorID &actor_id) + : QueueMessageHandler(actor_id), handler_service_dummy_worker_(handler_service_) { Start(); } /// Create a upstream queue. @@ -153,6 +152,7 @@ class UpstreamQueueMessageHandler : public QueueMessageHandler { const ActorID &actor_id); static std::shared_ptr GetService(); virtual void Start() override; + private: bool CheckQueueSync(const ObjectID &queue_ids); virtual void Stop() override; @@ -165,13 +165,13 @@ class UpstreamQueueMessageHandler : public QueueMessageHandler { std::thread handle_service_thread_; }; -/// DownstreamQueueMessageHandler holds and manages all downstream queues of current actor. +/// DownstreamQueueMessageHandler holds and manages all downstream queues of current +/// actor. class DownstreamQueueMessageHandler : public QueueMessageHandler { public: - DownstreamQueueMessageHandler(const ActorID &actor_id) - : QueueMessageHandler(actor_id) { - Start(); - } + DownstreamQueueMessageHandler(const ActorID &actor_id) : QueueMessageHandler(actor_id) { + Start(); + } /// Create a downstream queue. /// \param queue_id, queue id of the queue to be created. /// \param peer_actor_id, actor id of peer actor. diff --git a/streaming/src/queue/queue_item.h b/streaming/src/queue/queue_item.h index e01928442..f3954f346 100644 --- a/streaming/src/queue/queue_item.h +++ b/streaming/src/queue/queue_item.h @@ -33,14 +33,22 @@ class QueueItem { /// \param[in] raw whether the data content is raw bytes, only used in some tests. QueueItem(uint64_t seq_id, uint8_t *data, uint32_t data_size, uint64_t timestamp, uint64_t msg_id_start, uint64_t msg_id_end, bool raw = false) - : seq_id_(seq_id), msg_id_start_(msg_id_start), msg_id_end_(msg_id_end), + : seq_id_(seq_id), + msg_id_start_(msg_id_start), + msg_id_end_(msg_id_end), timestamp_(timestamp), raw_(raw), /*COPY*/ buffer_(std::make_shared(data, data_size, true)) {} QueueItem(uint64_t seq_id, std::shared_ptr buffer, - uint64_t timestamp, uint64_t msg_id_start, uint64_t msg_id_end, bool raw = false) - : seq_id_(seq_id), msg_id_start_(msg_id_start), msg_id_end_(msg_id_end), timestamp_(timestamp), raw_(raw), buffer_(buffer) {} + uint64_t timestamp, uint64_t msg_id_start, uint64_t msg_id_end, + bool raw = false) + : seq_id_(seq_id), + msg_id_start_(msg_id_start), + msg_id_end_(msg_id_end), + timestamp_(timestamp), + raw_(raw), + buffer_(buffer) {} QueueItem(std::shared_ptr data_msg) : seq_id_(data_msg->SeqId()), @@ -112,7 +120,8 @@ class QueueItem { class InvalidQueueItem : public QueueItem { public: - InvalidQueueItem() : QueueItem(QUEUE_INVALID_SEQ_ID, data_, 1, 0, QUEUE_INVALID_SEQ_ID, + InvalidQueueItem() + : QueueItem(QUEUE_INVALID_SEQ_ID, data_, 1, 0, QUEUE_INVALID_SEQ_ID, QUEUE_INVALID_SEQ_ID) {} private: diff --git a/streaming/src/test/mock_actor.cc b/streaming/src/test/mock_actor.cc index ce9b1bc86..f505496f2 100644 --- a/streaming/src/test/mock_actor.cc +++ b/streaming/src/test/mock_actor.cc @@ -260,12 +260,12 @@ class StreamingQueueReaderTestSuite : public StreamingQueueTestSuite { class StreamingQueueUpStreamTestSuite : public StreamingQueueTestSuite { public: StreamingQueueUpStreamTestSuite(ActorID &peer_actor_id, std::vector queue_ids, - std::vector rescale_queue_ids) + std::vector rescale_queue_ids) : StreamingQueueTestSuite(peer_actor_id, queue_ids, rescale_queue_ids) { test_func_map_ = { {"pull_peer_async_test", std::bind(&StreamingQueueUpStreamTestSuite::PullPeerAsyncTest, this)}, - {"get_queue_test", + {"get_queue_test", std::bind(&StreamingQueueUpStreamTestSuite::GetQueueTest, this)}}; } @@ -274,11 +274,16 @@ class StreamingQueueUpStreamTestSuite : public StreamingQueueTestSuite { std::this_thread::sleep_for(std::chrono::milliseconds(2000)); auto upstream_handler = ray::streaming::UpstreamQueueMessageHandler::GetService(); ObjectID &queue_id = queue_ids_[0]; - RayFunction async_call_func{ray::Language::PYTHON, - ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "reader_async_call_func", ""})}; - RayFunction sync_call_func{ray::Language::PYTHON, - ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "reader_sync_call_func", ""})}; - upstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func, sync_call_func); + RayFunction async_call_func{ + ray::Language::PYTHON, + ray::FunctionDescriptorBuilder::FromVector( + ray::Language::PYTHON, {"", "", "reader_async_call_func", ""})}; + RayFunction sync_call_func{ + ray::Language::PYTHON, + ray::FunctionDescriptorBuilder::FromVector( + ray::Language::PYTHON, {"", "", "reader_sync_call_func", ""})}; + upstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func, + sync_call_func); upstream_handler->CreateUpstreamQueue(queue_id, peer_actor_id_, 10240); STREAMING_LOG(INFO) << "IsQueueExist: " << upstream_handler->UpstreamQueueExists(queue_id); @@ -296,11 +301,16 @@ class StreamingQueueUpStreamTestSuite : public StreamingQueueTestSuite { std::this_thread::sleep_for(std::chrono::milliseconds(2000)); auto upstream_handler = ray::streaming::UpstreamQueueMessageHandler::GetService(); ObjectID &queue_id = queue_ids_[0]; - RayFunction async_call_func{ray::Language::PYTHON, - ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "reader_async_call_func", ""})}; - RayFunction sync_call_func{ray::Language::PYTHON, - ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "reader_sync_call_func", ""})}; - upstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func, sync_call_func); + RayFunction async_call_func{ + ray::Language::PYTHON, + ray::FunctionDescriptorBuilder::FromVector( + ray::Language::PYTHON, {"", "", "reader_async_call_func", ""})}; + RayFunction sync_call_func{ + ray::Language::PYTHON, + ray::FunctionDescriptorBuilder::FromVector( + ray::Language::PYTHON, {"", "", "reader_sync_call_func", ""})}; + upstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func, + sync_call_func); std::shared_ptr queue = upstream_handler->CreateUpstreamQueue(queue_id, peer_actor_id_, 10240); STREAMING_LOG(INFO) << "IsQueueExist: " @@ -313,8 +323,10 @@ class StreamingQueueUpStreamTestSuite : public StreamingQueueTestSuite { uint8_t data[100]; memset(data, msg_id, 100); STREAMING_LOG(INFO) << "Writer User Push item msg_id: " << msg_id; - ASSERT_TRUE( - queue->Push(msg_id/*seqid*/, data, 100, current_sys_time_ms(), msg_id, msg_id, true).ok()); + ASSERT_TRUE(queue + ->Push(msg_id /*seqid*/, data, 100, current_sys_time_ms(), msg_id, + msg_id, true) + .ok()); queue->Send(); } @@ -326,25 +338,30 @@ class StreamingQueueUpStreamTestSuite : public StreamingQueueTestSuite { class StreamingQueueDownStreamTestSuite : public StreamingQueueTestSuite { public: - StreamingQueueDownStreamTestSuite(ActorID peer_actor_id, std::vector queue_ids, - std::vector rescale_queue_ids) + StreamingQueueDownStreamTestSuite(ActorID peer_actor_id, + std::vector queue_ids, + std::vector rescale_queue_ids) : StreamingQueueTestSuite(peer_actor_id, queue_ids, rescale_queue_ids) { test_func_map_ = { {"pull_peer_async_test", std::bind(&StreamingQueueDownStreamTestSuite::PullPeerAsyncTest, this)}, - {"get_queue_test", + {"get_queue_test", std::bind(&StreamingQueueDownStreamTestSuite::GetQueueTest, this)}}; }; void GetQueueTest() { - auto downstream_handler = - ray::streaming::DownstreamQueueMessageHandler::GetService(); + auto downstream_handler = ray::streaming::DownstreamQueueMessageHandler::GetService(); ObjectID &queue_id = queue_ids_[0]; - RayFunction async_call_func{ray::Language::PYTHON, - ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "writer_async_call_func", ""})}; - RayFunction sync_call_func{ray::Language::PYTHON, - ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "writer_sync_call_func", ""})}; - downstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func, sync_call_func); + RayFunction async_call_func{ + ray::Language::PYTHON, + ray::FunctionDescriptorBuilder::FromVector( + ray::Language::PYTHON, {"", "", "writer_async_call_func", ""})}; + RayFunction sync_call_func{ + ray::Language::PYTHON, + ray::FunctionDescriptorBuilder::FromVector( + ray::Language::PYTHON, {"", "", "writer_sync_call_func", ""})}; + downstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func, + sync_call_func); downstream_handler->CreateDownstreamQueue(queue_id, peer_actor_id_); bool is_upstream_first_pull_ = false; @@ -357,14 +374,18 @@ class StreamingQueueDownStreamTestSuite : public StreamingQueueTestSuite { } void PullPeerAsyncTest() { - auto downstream_handler = - ray::streaming::DownstreamQueueMessageHandler::GetService(); + auto downstream_handler = ray::streaming::DownstreamQueueMessageHandler::GetService(); ObjectID &queue_id = queue_ids_[0]; - RayFunction async_call_func{ray::Language::PYTHON, - ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "writer_async_call_func", ""})}; - RayFunction sync_call_func{ray::Language::PYTHON, - ray::FunctionDescriptorBuilder::FromVector(ray::Language::PYTHON, {"", "", "writer_sync_call_func", ""})}; - downstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func, sync_call_func); + RayFunction async_call_func{ + ray::Language::PYTHON, + ray::FunctionDescriptorBuilder::FromVector( + ray::Language::PYTHON, {"", "", "writer_async_call_func", ""})}; + RayFunction sync_call_func{ + ray::Language::PYTHON, + ray::FunctionDescriptorBuilder::FromVector( + ray::Language::PYTHON, {"", "", "writer_sync_call_func", ""})}; + downstream_handler->SetPeerActorID(queue_id, peer_actor_id_, async_call_func, + sync_call_func); std::shared_ptr queue = downstream_handler->CreateDownstreamQueue(queue_id, peer_actor_id_); @@ -511,7 +532,7 @@ class StreamingWorker { ray::FunctionDescriptorType::kPythonFunctionDescriptor); auto typed_descriptor = function_descriptor->As(); STREAMING_LOG(DEBUG) << "StreamingWorker::ExecuteTask " - << typed_descriptor->ToString(); + << typed_descriptor->ToString(); std::string func_name = typed_descriptor->FunctionName(); if (func_name == "init") { diff --git a/streaming/src/test/queue_protobuf_tests.cc b/streaming/src/test/queue_protobuf_tests.cc index 2b9bcadb0..8acf8eeb2 100644 --- a/streaming/src/test/queue_protobuf_tests.cc +++ b/streaming/src/test/queue_protobuf_tests.cc @@ -12,9 +12,10 @@ TEST(ProtoBufTest, MessageCommonTest) { ray::ActorID actor_id = ray::ActorID::Of(job_id, task_id, 0); ray::ActorID peer_actor_id = ray::ActorID::Of(job_id, task_id, 1); ObjectID queue_id = ray::ObjectID::FromRandom(); - + uint8_t data[128]; - std::shared_ptr buffer = std::make_shared(data, 128, true); + std::shared_ptr buffer = + std::make_shared(data, 128, true); DataMessage msg(actor_id, peer_actor_id, queue_id, 100, 1000, 2000, buffer, true); std::unique_ptr serilized_buffer = msg.ToBytes(); std::shared_ptr msg2 = DataMessage::FromBytes(serilized_buffer->Data()); diff --git a/streaming/src/test/streaming_queue_tests.cc b/streaming/src/test/streaming_queue_tests.cc index 6b95b1495..c2c678315 100644 --- a/streaming/src/test/streaming_queue_tests.cc +++ b/streaming/src/test/streaming_queue_tests.cc @@ -18,8 +18,7 @@ static int node_manager_port; class StreamingQueueTest : public StreamingQueueTestBase { public: - StreamingQueueTest() - : StreamingQueueTestBase(1, node_manager_port) {} + StreamingQueueTest() : StreamingQueueTestBase(1, node_manager_port) {} }; class StreamingWriterTest : public StreamingQueueTestBase {