From 4e65dfc2aeae72f9c6d9f0d91e693d026f570c43 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sun, 24 Apr 2016 19:09:02 -0700 Subject: [PATCH] support multiple object stores, part two --- src/objstore.cc | 242 ++++++++++++++++++++++++++++-------------------- 1 file changed, 141 insertions(+), 101 deletions(-) diff --git a/src/objstore.cc b/src/objstore.cc index 5f637aa64..b3e1e0b95 100644 --- a/src/objstore.cc +++ b/src/objstore.cc @@ -1,25 +1,44 @@ #include "objstore.h" #include -const size_t ObjStoreClient::CHUNK_SIZE = 8 * 1024; +const size_t ObjStoreService::CHUNK_SIZE = 8 * 1024; // this method needs to be protected by a objstore_lock_ -Status ObjStoreClient::upload_data_to(slice data, ObjRef objref, ObjStore::Stub& stub) { +// TODO(rkn): Make sure that we do not in fact need the objstore_lock_. We want multiple deliveries to be able to happen simultaneously. +void ObjStoreService::pull_data_from(ObjRef objref, ObjStore::Stub& stub) { + ORCH_LOG(ORCH_DEBUG, "Objstore " << objstoreid_ << " is beginning to pull objref " << objref); ObjChunk chunk; ClientContext context; - AckReply reply; - std::unique_ptr > writer(stub.StreamObj(&context, &reply)); - const uint8_t* head = data.data; - for (size_t i = 0; i < data.len; i += CHUNK_SIZE) { - chunk.set_objref(objref); - chunk.set_totalsize(data.len); - chunk.set_data(head + i, std::min(CHUNK_SIZE, data.len - i)); - if (!writer->Write(chunk)) { - ORCH_LOG(ORCH_FATAL, "stream connection prematurely closed") - } + StreamObjToRequest stream_request; + stream_request.set_objref(objref); + std::unique_ptr > reader(stub.StreamObjTo(&context, stream_request)); + + size_t total_size = 0; + ObjHandle handle; + if (reader->Read(&chunk)) { + total_size = chunk.total_size(); + handle = alloc(objref, total_size); } - writer->WritesDone(); - return writer->Finish(); + size_t num_bytes = 0; + segmentpool_lock_.lock(); + uint8_t* data = segmentpool_->get_address(handle); + segmentpool_lock_.unlock(); + do { + if (num_bytes + chunk.data().size() > total_size) { + ORCH_LOG(ORCH_FATAL, "The reader attempted to stream too many bytes."); + } + std::memcpy(data, chunk.data().c_str(), chunk.data().size()); + data += chunk.data().size(); + num_bytes += chunk.data().size(); + } while (reader->Read(&chunk)); + Status status = reader->Finish(); // Right now we don't use the status. + + // finalize object + if (num_bytes != total_size) { + ORCH_LOG(ORCH_FATAL, "Streamed objref " << objref << ", but num_bytes != total_size"); + } + object_ready(objref, chunk.metadata_offset()); + ORCH_LOG(ORCH_DEBUG, "finished streaming data, objref was " << objref << " and size was " << num_bytes); } ObjStoreService::ObjStoreService(const std::string& objstore_address, std::shared_ptr scheduler_channel) @@ -44,15 +63,35 @@ ObjStore::Stub& ObjStoreService::get_objstore_stub(const std::string& objstore_a return *objstores_[objstore_address]; } -/* -Status ObjStoreService::DeliverObj(ServerContext* context, const DeliverObjRequest* request, AckReply* reply) { - std::lock_guard objstores_lock(objstores_lock_); - ObjStore::Stub& stub = get_objstore_stub(request->objstore_address()); +Status ObjStoreService::StartDelivery(ServerContext* context, const StartDeliveryRequest* request, AckReply* reply) { + // TODO(rkn): We're pushing the delivery task onto a new thread so that this method can return immediately. This matters + // because the scheduler holds a lock while DeliverObj is being called. The correct solution is to make DeliverObj + // an asynchronous call (and similarly with the rest of the object store service methods). + std::string address = request->objstore_address(); ObjRef objref = request->objref(); - Status status = ObjStoreClient::upload_data_to(memory_[objref].ptr, objref, stub); - return status; + { + std::lock_guard memory_lock(memory_lock_); + if (objref >= memory_.size()) { + memory_.resize(objref + 1, std::make_pair(ObjHandle(), MemoryStatusType::NOT_PRESENT)); + } + if (memory_[objref].second == MemoryStatusType::NOT_PRESENT) { + } + else if (memory_[objref].second == MemoryStatusType::DEALLOCATED) { + ORCH_LOG(ORCH_FATAL, "Objstore " << objstoreid_ << " is attempting to get objref " << objref << ", but memory_[objref] == DEALLOCATED."); + } + else { + ORCH_LOG(ORCH_DEBUG, "Objstore " << objstoreid_ << " already has objref " << objref << " or it is already being shipped, so no need to pull it again."); + return Status::OK; + } + memory_[objref].second = MemoryStatusType::PRE_ALLOCED; + } + delivery_threads_.push_back(std::make_shared([this, address, objref]() { + std::lock_guard objstores_lock(objstores_lock_); + ObjStore::Stub& stub = get_objstore_stub(address); + pull_data_from(objref, stub); + })); + return Status::OK; } -*/ Status ObjStoreService::ObjStoreInfo(ServerContext* context, const ObjStoreInfoRequest* request, ObjStoreInfoReply* reply) { std::lock_guard memory_lock(memory_lock_); @@ -73,70 +112,59 @@ Status ObjStoreService::ObjStoreInfo(ServerContext* context, const ObjStoreInfoR return Status::OK; } -/* -Status ObjStoreService::StreamObj(ServerContext* context, ServerReader* reader, AckReply* reply) { - ORCH_LOG(ORCH_VERBOSE, "begin to stream data to object store " << objstoreid_); - memory_lock_.lock(); +Status ObjStoreService::StreamObjTo(ServerContext* context, const StreamObjToRequest* request, ServerWriter* writer) { + ORCH_LOG(ORCH_DEBUG, "begin to stream data from object store " << objstoreid_); ObjChunk chunk; - ObjRef objref = 0; - size_t totalsize = 0; - if (reader->Read(&chunk)) { - objref = chunk.objref(); - totalsize = chunk.totalsize(); - allocate_memory(objref, totalsize); + ObjRef objref = request->objref(); + memory_lock_.lock(); + if (objref >= memory_.size()) { + ORCH_LOG(ORCH_FATAL, "Objstore " << objstoreid_ << " is attempting to use objref " << objref << " in StreamObjTo, but this objref is not present in the object store."); } - size_t num_bytes = 0; - char* data = memory_[objref].ptr.data; - - do { - if (num_bytes + chunk.data().size() > totalsize) { - memory_lock_.unlock(); - return Status::CANCELLED; + if (memory_[objref].second != MemoryStatusType::READY) { + ORCH_LOG(ORCH_FATAL, "Objstore " << objstoreid_ << " is attempting to stream objref " << objref << ", but memory_[objref].second != MemoryStatusType::READY."); + } + ObjHandle handle = memory_[objref].first; + memory_lock_.unlock(); // TODO(rkn): Make sure we don't still need to hold on to this lock. + segmentpool_lock_.lock(); + const uint8_t* head = segmentpool_->get_address(handle); + segmentpool_lock_.unlock(); + size_t size = handle.size(); + for (size_t i = 0; i < size; i += CHUNK_SIZE) { + chunk.set_metadata_offset(handle.metadata_offset()); + chunk.set_total_size(size); + chunk.set_data(head + i, std::min(CHUNK_SIZE, size - i)); + if (!writer->Write(chunk)) { + ORCH_LOG(ORCH_FATAL, "stream connection prematurely closed") } - std::memcpy(data, chunk.data().c_str(), chunk.data().size()); - data += chunk.data().size(); - num_bytes += chunk.data().size(); - } while (reader->Read(&chunk)); - - ORCH_LOG(ORCH_VERBOSE, "finished streaming data, objref was " << objref << " and size was " << num_bytes); - - memory_lock_.unlock(); - - ClientContext objready_context; - ObjReadyRequest objready_request; - objready_request.set_objref(objref); - objready_request.set_objstoreid(objstoreid_); - AckReply objready_reply; - scheduler_stub_->ObjReady(&objready_context, objready_request, &objready_reply); - + } return Status::OK; } -*/ Status ObjStoreService::NotifyAlias(ServerContext* context, const NotifyAliasRequest* request, AckReply* reply) { // NotifyAlias assumes that the objstore already holds canonical_objref ObjRef alias_objref = request->alias_objref(); ObjRef canonical_objref = request->canonical_objref(); ORCH_LOG(ORCH_DEBUG, "Aliasing objref " << alias_objref << " with objref " << canonical_objref); - std::lock_guard memory_lock(memory_lock_); - if (canonical_objref >= memory_.size()) { - ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not in the objstore.") + { + std::lock_guard memory_lock(memory_lock_); + if (canonical_objref >= memory_.size()) { + ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not in the objstore.") + } + if (memory_[canonical_objref].second == MemoryStatusType::NOT_READY) { + ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not ready yet in the objstore.") + } + if (memory_[canonical_objref].second == MemoryStatusType::NOT_PRESENT) { + ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not present in the objstore.") + } + if (memory_[canonical_objref].second == MemoryStatusType::DEALLOCATED) { + ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " has already been deallocated.") + } + if (alias_objref >= memory_.size()) { + memory_.resize(alias_objref + 1, std::make_pair(ObjHandle(), MemoryStatusType::NOT_PRESENT)); + } + memory_[alias_objref].first = memory_[canonical_objref].first; + memory_[alias_objref].second = MemoryStatusType::READY; } - if (memory_[canonical_objref].second == MemoryStatusType::NOT_READY) { - ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not ready yet in the objstore.") - } - if (memory_[canonical_objref].second == MemoryStatusType::NOT_PRESENT) { - ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not present in the objstore.") - } - if (memory_[canonical_objref].second == MemoryStatusType::DEALLOCATED) { - ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " has already been deallocated.") - } - if (alias_objref >= memory_.size()) { - memory_.resize(alias_objref + 1, std::make_pair(ObjHandle(), MemoryStatusType::NOT_PRESENT)); - } - memory_[alias_objref].first = memory_[canonical_objref].first; - memory_[alias_objref].second = MemoryStatusType::READY; - ObjRequest done_request; done_request.type = ObjRequestType::ALIAS_DONE; done_request.objref = alias_objref; @@ -154,7 +182,9 @@ Status ObjStoreService::DeallocateObject(ServerContext* context, const Deallocat if (canonical_objref >= memory_.size()) { ORCH_LOG(ORCH_FATAL, "Attempting to deallocate canonical_objref " << canonical_objref << ", but it is not in the objstore."); } + segmentpool_lock_.lock(); segmentpool_->deallocate(memory_[canonical_objref].first); + segmentpool_lock_.unlock(); memory_[canonical_objref].second = MemoryStatusType::DEALLOCATED; return Status::OK; } @@ -197,15 +227,8 @@ void ObjStoreService::process_worker_request(const ObjRequest request) { } switch (request.type) { case ObjRequestType::ALLOC: { - // TODO(rkn): Does segmentpool_ need a lock around it? - ObjHandle reply = segmentpool_->allocate(request.size); - send_queues_[request.workerid].send(&reply); - std::lock_guard memory_lock(memory_lock_); - if (memory_[request.objref].second != MemoryStatusType::NOT_PRESENT) { - ORCH_LOG(ORCH_FATAL, "Attempting to allocate space for objref " << request.objref << ", but memory_[objref].second != MemoryStatusType::NOT_PRESENT, it equals " << memory_[request.objref].second); - } - memory_[request.objref].first = reply; - memory_[request.objref].second = MemoryStatusType::NOT_READY; + ObjHandle handle = alloc(request.objref, request.size); // This method acquires memory_lock_ + send_queues_[request.workerid].send(&handle); } break; case ObjRequestType::GET: { @@ -214,7 +237,7 @@ void ObjStoreService::process_worker_request(const ObjRequest request) { if (item.second == MemoryStatusType::READY) { ORCH_LOG(ORCH_DEBUG, "Responding to GET request: returning objref " << request.objref); send_queues_[request.workerid].send(&item.first); - } else if (item.second == MemoryStatusType::NOT_READY || item.second == MemoryStatusType::NOT_PRESENT) { + } else if (item.second == MemoryStatusType::NOT_READY || item.second == MemoryStatusType::NOT_PRESENT || item.second == MemoryStatusType::PRE_ALLOCED) { std::lock_guard lock(pull_queue_lock_); pull_queue_.push_back(std::make_pair(request.workerid, request.objref)); } else { @@ -223,24 +246,7 @@ void ObjStoreService::process_worker_request(const ObjRequest request) { } break; case ObjRequestType::WORKER_DONE: { - { - std::lock_guard memory_lock(memory_lock_); - std::pair& item = memory_[request.objref]; - if (item.second != MemoryStatusType::NOT_READY) { - ORCH_LOG(ORCH_FATAL, "A worker notified the object store that objref " << request.objref << " has been written to the object store, but memory_[objref].second != NOT_READY."); - } - item.first.set_metadata_offset(request.metadata_offset); - item.second = MemoryStatusType::READY; - } - process_pulls_for_objref(request.objref); - // Tell the scheduler that the object arrived - // TODO(pcm): put this in a separate thread so we don't have to pay the latency here - ClientContext objready_context; - ObjReadyRequest objready_request; - objready_request.set_objref(request.objref); - objready_request.set_objstoreid(objstoreid_); - AckReply objready_reply; - scheduler_stub_->ObjReady(&objready_context, objready_request, &objready_reply); + object_ready(request.objref, request.metadata_offset); // This method acquires memory_lock_ } break; default: { @@ -293,6 +299,40 @@ void ObjStoreService::process_pulls_for_objref(ObjRef objref) { } } +ObjHandle ObjStoreService::alloc(ObjRef objref, size_t size) { + segmentpool_lock_.lock(); + ObjHandle handle = segmentpool_->allocate(size); + segmentpool_lock_.unlock(); + std::lock_guard memory_lock(memory_lock_); + if (memory_[objref].second != MemoryStatusType::NOT_PRESENT && memory_[objref].second != MemoryStatusType::PRE_ALLOCED) { + ORCH_LOG(ORCH_FATAL, "Attempting to allocate space for objref " << objref << ", but memory_[objref].second = " << memory_[objref].second); + } + memory_[objref].first = handle; + memory_[objref].second = MemoryStatusType::NOT_READY; + return handle; +} + +void ObjStoreService::object_ready(ObjRef objref, size_t metadata_offset) { + { + std::lock_guard memory_lock(memory_lock_); + std::pair& item = memory_[objref]; + if (item.second != MemoryStatusType::NOT_READY) { + ORCH_LOG(ORCH_FATAL, "A worker notified the object store that objref " << objref << " has been written to the object store, but memory_[objref].second != NOT_READY."); + } + item.first.set_metadata_offset(metadata_offset); + item.second = MemoryStatusType::READY; + } + process_pulls_for_objref(objref); + // Tell the scheduler that the object arrived + // TODO(pcm): put this in a separate thread so we don't have to pay the latency here + ClientContext objready_context; + ObjReadyRequest objready_request; + objready_request.set_objref(objref); + objready_request.set_objstoreid(objstoreid_); + AckReply objready_reply; + scheduler_stub_->ObjReady(&objready_context, objready_request, &objready_reply); +} + void ObjStoreService::start_objstore_service() { communicator_thread_ = std::thread([this]() { ORCH_LOG(ORCH_INFO, "started object store communicator server");