diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index 7aa18e807..4d132b9bd 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -181,14 +181,14 @@ class ClientMmapTableEntry { pointer_ = reinterpret_cast(MapViewOfFile(reinterpret_cast(fh_get(fd)), FILE_MAP_ALL_ACCESS, 0, 0, length_)); // TODO(pcm): Don't fail here, instead return a Status. if (pointer_ == NULL) { - ARROW_LOG(FATAL) << "mmap failed"; + RAY_LOG(FATAL) << "mmap failed"; } #else pointer_ = reinterpret_cast( mmap(NULL, length_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)); // TODO(pcm): Don't fail here, instead return a Status. if (pointer_ == MAP_FAILED) { - ARROW_LOG(FATAL) << "mmap failed"; + RAY_LOG(FATAL) << "mmap failed"; } #endif close(fd); // Closing this fd has an effect on performance. @@ -207,7 +207,7 @@ class ClientMmapTableEntry { r = munmap(pointer_, length_); #endif if (r != 0) { - ARROW_LOG(ERROR) << "munmap returned " << r << ", errno = " << errno; + RAY_LOG(ERROR) << "munmap returned " << r << ", errno = " << errno; } } @@ -382,7 +382,7 @@ uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_ // process before. uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) { auto entry = mmap_table_.find(store_fd_val); - ARROW_CHECK(entry != mmap_table_.end()); + RAY_CHECK(entry != mmap_table_.end()); return entry->second->pointer(); } @@ -397,7 +397,7 @@ int PlasmaClient::Impl::GetStoreFd(int store_fd) { auto entry = mmap_table_.find(store_fd); if (entry == mmap_table_.end()) { int fd = recv_fd(store_conn_); - ARROW_CHECK(fd >= 0) << "recv not successful"; + RAY_CHECK(fd >= 0) << "recv not successful"; return fd; } else { return entry->second->fd(); @@ -421,7 +421,7 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id, object_entry = objects_in_use_[object_id].get(); } else { object_entry = elem->second.get(); - ARROW_CHECK(object_entry->count > 0); + RAY_CHECK(object_entry->count > 0); } // Increment the count of the number of instances of this object that are // being used by this client. The corresponding decrement should happen in @@ -435,7 +435,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, bool evict_if_full) { std::lock_guard guard(client_mutex_); - ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " + RAY_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " << data_size << " and metadata size " << metadata_size; RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, evict_if_full, data_size, metadata_size, device_num)); @@ -451,10 +451,10 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, // descriptor. if (device_num == 0) { int fd = GetStoreFd(store_fd); - ARROW_CHECK(object.data_size == data_size); - ARROW_CHECK(object.metadata_size == metadata_size); + RAY_CHECK(object.data_size == data_size); + RAY_CHECK(object.metadata_size == metadata_size); // The metadata should come right after the data. - ARROW_CHECK(object.metadata_offset == object.data_offset + data_size); + RAY_CHECK(object.metadata_offset == object.data_offset + data_size); *data = std::make_shared( shared_from_this(), LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset, data_size); @@ -483,7 +483,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, } *data = MakeBufferFromGpuProcessHandle(handle); #else - ARROW_LOG(FATAL) << "Arrow GPU library is not enabled."; + RAY_LOG(FATAL) << "Arrow GPU library is not enabled."; #endif } @@ -505,7 +505,7 @@ Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id, bool evict_if_full) { std::lock_guard guard(client_mutex_); - ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_; + RAY_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_; RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, evict_if_full, data, metadata)); @@ -522,7 +522,7 @@ Status PlasmaClient::Impl::CreateAndSealBatch(const std::vector& objec bool evict_if_full) { std::lock_guard guard(client_mutex_); - ARROW_LOG(DEBUG) << "called CreateAndSealBatch on conn " << store_conn_; + RAY_LOG(DEBUG) << "called CreateAndSealBatch on conn " << store_conn_; RETURN_NOT_OK(SendCreateAndSealBatchRequest(store_conn_, object_ids, evict_if_full, data, metadata)); @@ -551,9 +551,9 @@ Status PlasmaClient::Impl::GetBuffers( // This client created the object but hasn't sealed it. If we call Get // with no timeout, we will deadlock, because this client won't be able to // call Seal. - ARROW_CHECK(timeout_ms != -1) + RAY_CHECK(timeout_ms != -1) << "Plasma client called get on an unsealed object that it created"; - ARROW_LOG(WARNING) + RAY_LOG(WARNING) << "Attempting to get an object that this client created but hasn't sealed."; all_present = false; } else { @@ -568,11 +568,11 @@ Status PlasmaClient::Impl::GetBuffers( #ifdef PLASMA_CUDA std::lock_guard lock(gpu_mutex); auto iter = gpu_object_map.find(object_ids[i]); - ARROW_CHECK(iter != gpu_object_map.end()); + RAY_CHECK(iter != gpu_object_map.end()); iter->second->client_count++; physical_buf = MakeBufferFromGpuProcessHandle(iter->second); #else - ARROW_LOG(FATAL) << "Arrow GPU library is not enabled."; + RAY_LOG(FATAL) << "Arrow GPU library is not enabled."; #endif } physical_buf = wrap_buffer(object_ids[i], physical_buf); @@ -612,12 +612,12 @@ Status PlasmaClient::Impl::GetBuffers( } for (int64_t i = 0; i < num_objects; ++i) { - DCHECK(received_object_ids[i] == object_ids[i]); + RAY_DCHECK(received_object_ids[i] == object_ids[i]); object = &object_data[i]; if (object_buffers[i].data) { // If the object was already in use by the client, then the store should // have returned it. - DCHECK_NE(object->data_size, -1); + RAY_DCHECK(object->data_size != -1); // We've already filled out the information for this object, so we can // just continue. continue; @@ -648,7 +648,7 @@ Status PlasmaClient::Impl::GetBuffers( physical_buf = MakeBufferFromGpuProcessHandle(iter->second); } #else - ARROW_LOG(FATAL) << "Arrow GPU library is not enabled."; + RAY_LOG(FATAL) << "Arrow GPU library is not enabled."; #endif } // Finish filling out the return values. @@ -663,8 +663,8 @@ Status PlasmaClient::Impl::GetBuffers( } else { // The object was not retrieved. The caller can detect this condition // by checking the boolean value of the metadata/data buffers. - DCHECK(!object_buffers[i].metadata); - DCHECK(!object_buffers[i].data); + RAY_DCHECK(!object_buffers[i].metadata); + RAY_DCHECK(!object_buffers[i].data); } } return Status::OK(); @@ -694,8 +694,8 @@ Status PlasmaClient::Impl::Get(const ObjectID* object_ids, int64_t num_objects, Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) { auto object_entry = objects_in_use_.find(object_id); - ARROW_CHECK(object_entry != objects_in_use_.end()); - ARROW_CHECK(object_entry->second->count == 0); + RAY_CHECK(object_entry != objects_in_use_.end()); + RAY_CHECK(object_entry->second->count == 0); // Remove the entry from the hash table of objects currently in use. objects_in_use_.erase(object_id); @@ -710,13 +710,13 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) { return Status::OK(); } auto object_entry = objects_in_use_.find(object_id); - ARROW_CHECK(object_entry != objects_in_use_.end()); + RAY_CHECK(object_entry != objects_in_use_.end()); #ifdef PLASMA_CUDA if (object_entry->second->object.device_num != 0) { std::lock_guard lock(gpu_mutex); auto iter = gpu_object_map.find(object_id); - ARROW_CHECK(iter != gpu_object_map.end()); + RAY_CHECK(iter != gpu_object_map.end()); if (--iter->second->client_count == 0) { delete iter->second; gpu_object_map.erase(iter); @@ -725,7 +725,7 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) { #endif object_entry->second->count -= 1; - ARROW_CHECK(object_entry->second->count >= 0); + RAY_CHECK(object_entry->second->count >= 0); // Check if the client is no longer using this object. if (object_entry->second->count == 0) { // Tell the store that the client no longer needs the object. @@ -754,7 +754,7 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) std::vector buffer; RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaContainsReply, &buffer)); ObjectID object_id2; - DCHECK_GT(buffer.size(), 0); + RAY_DCHECK(buffer.size() > 0); RETURN_NOT_OK( ReadContainsReply(buffer.data(), buffer.size(), &object_id2, has_object)); } @@ -796,7 +796,7 @@ bool PlasmaClient::Impl::ComputeObjectHashParallel(XXH64_state_t* hash_state, &threadhash[num_threads]); for (auto& fut : futures) { - ARROW_CHECK_OK(fut.status()); + RAY_ARROW_CHECK_OK(fut.status()); } XXH64_update(hash_state, reinterpret_cast(threadhash), @@ -816,8 +816,8 @@ uint64_t PlasmaClient::Impl::ComputeObjectHash(const ObjectBuffer& obj_buffer) { uint64_t PlasmaClient::Impl::ComputeObjectHashCPU(const uint8_t* data, int64_t data_size, const uint8_t* metadata, int64_t metadata_size) { - DCHECK(metadata); - DCHECK(data); + RAY_DCHECK(metadata); + RAY_DCHECK(data); XXH64_state_t hash_state; XXH64_reset(&hash_state, XXH64_DEFAULT_SEED); if (data_size >= kBytesInMB) { @@ -854,7 +854,7 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) { RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaSealReply, &buffer)); ObjectID sealed_id; RETURN_NOT_OK(ReadSealReply(buffer.data(), buffer.size(), &sealed_id)); - ARROW_CHECK(sealed_id == object_id); + RAY_CHECK(sealed_id == object_id); // We call PlasmaClient::Release to decrement the number of instances of this // object // that are currently being used by this client. The corresponding increment @@ -866,9 +866,9 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) { Status PlasmaClient::Impl::Abort(const ObjectID& object_id) { std::lock_guard guard(client_mutex_); auto object_entry = objects_in_use_.find(object_id); - ARROW_CHECK(object_entry != objects_in_use_.end()) + RAY_CHECK(object_entry != objects_in_use_.end()) << "Plasma client called abort on an object without a reference to it"; - ARROW_CHECK(!object_entry->second->is_sealed) + RAY_CHECK(!object_entry->second->is_sealed) << "Plasma client called abort on a sealed object"; // Make sure that the Plasma client only has one reference to the object. If @@ -882,8 +882,8 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) { if (object_entry->second->object.device_num != 0) { std::lock_guard lock(gpu_mutex); auto iter = gpu_object_map.find(object_id); - ARROW_CHECK(iter != gpu_object_map.end()); - ARROW_CHECK(iter->second->client_count == 1); + RAY_CHECK(iter != gpu_object_map.end()); + RAY_CHECK(iter->second->client_count == 1); delete iter->second; gpu_object_map.erase(iter); } @@ -918,7 +918,7 @@ Status PlasmaClient::Impl::Delete(const std::vector& object_ids) { RETURN_NOT_OK(SendDeleteRequest(store_conn_, not_in_use_ids)); std::vector buffer; RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaDeleteReply, &buffer)); - DCHECK_GT(buffer.size(), 0); + RAY_DCHECK(buffer.size() > 0); std::vector error_codes; not_in_use_ids.clear(); RETURN_NOT_OK( @@ -983,16 +983,16 @@ Status PlasmaClient::Impl::Subscribe(int* fd) { // Make the socket non-blocking. #ifdef _WINSOCKAPI_ unsigned long value = 1; - ARROW_CHECK(ioctlsocket(sock[1], FIONBIO, &value) == 0); + RAY_CHECK(ioctlsocket(sock[1], FIONBIO, &value) == 0); #else int flags = fcntl(sock[1], F_GETFL, 0); - ARROW_CHECK(fcntl(sock[1], F_SETFL, flags | O_NONBLOCK) == 0); + RAY_CHECK(fcntl(sock[1], F_SETFL, flags | O_NONBLOCK) == 0); #endif // Tell the Plasma store about the subscription. RETURN_NOT_OK(SendSubscribeRequest(store_conn_)); // Send the file descriptor that the Plasma store should use to push // notifications about sealed objects to this client. - ARROW_CHECK(send_fd(store_conn_, sock[1]) >= 0); + RAY_CHECK(send_fd(store_conn_, sock[1]) >= 0); close(sock[1]); // Return the file descriptor that the client should use to read notifications // about sealed objects. @@ -1063,7 +1063,7 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name, return Status::NotImplemented("plasma manager is no longer supported"); } if (release_delay != 0) { - ARROW_LOG(WARNING) << "The release_delay parameter in PlasmaClient::Connect " + RAY_LOG(WARNING) << "The release_delay parameter in PlasmaClient::Connect " << "is deprecated"; } // Send a ConnectRequest to the store to get its memory capacity. diff --git a/src/ray/object_manager/plasma/dlmalloc.cc b/src/ray/object_manager/plasma/dlmalloc.cc index 27c8e704c..822cac0e4 100644 --- a/src/ray/object_manager/plasma/dlmalloc.cc +++ b/src/ray/object_manager/plasma/dlmalloc.cc @@ -59,7 +59,7 @@ int fake_munmap(void*, int64_t); #undef HAVE_MORECORE #undef DEFAULT_GRANULARITY -// dlmalloc.c defined DEBUG which will conflict with ARROW_LOG(DEBUG). +// dlmalloc.c defined DEBUG which will conflict with RAY_LOG(DEBUG). #ifdef DEBUG #undef DEBUG #endif @@ -90,12 +90,12 @@ int create_buffer(int64_t size) { file_name.push_back('\0'); fd = mkstemp(&file_name[0]); if (fd < 0) { - ARROW_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0]; + RAY_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0]; return -1; } // Immediately unlink the file so we do not leave traces in the system. if (unlink(&file_name[0]) != 0) { - ARROW_LOG(FATAL) << "failed to unlink file " << &file_name[0]; + RAY_LOG(FATAL) << "failed to unlink file " << &file_name[0]; return -1; } if (!plasma_config->hugepages_enabled) { @@ -103,7 +103,7 @@ int create_buffer(int64_t size) { // needed for files that are backed by the huge page fs, see also // http://www.mail-archive.com/kvm-devel@lists.sourceforge.net/msg14737.html if (ftruncate(fd, (off_t)size) != 0) { - ARROW_LOG(FATAL) << "failed to ftruncate file " << &file_name[0]; + RAY_LOG(FATAL) << "failed to ftruncate file " << &file_name[0]; return -1; } } @@ -118,7 +118,7 @@ void* fake_mmap(size_t size) { size += kMmapRegionsGap; int fd = create_buffer(size); - ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap"; + RAY_CHECK(fd >= 0) << "Failed to create buffer during mmap"; // MAP_POPULATE can be used to pre-populate the page tables for this memory region // which avoids work when accessing the pages later. However it causes long pauses // when mmapping the files. Only supported on Linux. @@ -126,15 +126,15 @@ void* fake_mmap(size_t size) { #ifdef _WIN32 pointer = MapViewOfFile(reinterpret_cast(fh_get(fd)), FILE_MAP_ALL_ACCESS, 0, 0, size); if (pointer == NULL) { - ARROW_LOG(ERROR) << "MapViewOfFile failed with error: " << GetLastError(); + RAY_LOG(ERROR) << "MapViewOfFile failed with error: " << GetLastError(); return reinterpret_cast(-1); } #else pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (pointer == MAP_FAILED) { - ARROW_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno); + RAY_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno); if (errno == ENOMEM && plasma_config->hugepages_enabled) { - ARROW_LOG(ERROR) + RAY_LOG(ERROR) << " (this probably means you have to increase /proc/sys/vm/nr_hugepages)"; } return pointer; @@ -150,12 +150,12 @@ void* fake_mmap(size_t size) { // We lie to dlmalloc about where mapped memory actually lives. pointer = pointer_advance(pointer, kMmapRegionsGap); - ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")"; + RAY_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")"; return pointer; } int fake_munmap(void* addr, int64_t size) { - ARROW_LOG(DEBUG) << "fake_munmap(" << addr << ", " << size << ")"; + RAY_LOG(DEBUG) << "fake_munmap(" << addr << ", " << size << ")"; addr = pointer_retreat(addr, kMmapRegionsGap); size += kMmapRegionsGap; diff --git a/src/ray/object_manager/plasma/eviction_policy.cc b/src/ray/object_manager/plasma/eviction_policy.cc index 8611cafa1..4c053e82e 100644 --- a/src/ray/object_manager/plasma/eviction_policy.cc +++ b/src/ray/object_manager/plasma/eviction_policy.cc @@ -25,7 +25,7 @@ namespace plasma { void LRUCache::Add(const ObjectID& key, int64_t size) { auto it = item_map_.find(key); - ARROW_CHECK(it == item_map_.end()); + RAY_CHECK(it == item_map_.end()); // Note that it is important to use a list so the iterators stay valid. item_list_.emplace_front(key, size); item_map_.emplace(key, item_list_.begin()); @@ -41,15 +41,15 @@ int64_t LRUCache::Remove(const ObjectID& key) { used_capacity_ -= size; item_list_.erase(it->second); item_map_.erase(it); - ARROW_CHECK(used_capacity_ >= 0) << DebugString(); + RAY_CHECK(used_capacity_ >= 0) << DebugString(); return size; } void LRUCache::AdjustCapacity(int64_t delta) { - ARROW_LOG(INFO) << "adjusting global lru capacity from " << Capacity() << " to " + RAY_LOG(INFO) << "adjusting global lru capacity from " << Capacity() << " to " << (Capacity() + delta) << " (max " << OriginalCapacity() << ")"; capacity_ += delta; - ARROW_CHECK(used_capacity_ >= 0) << DebugString(); + RAY_CHECK(used_capacity_ >= 0) << DebugString(); } int64_t LRUCache::Capacity() const { return capacity_; } @@ -128,10 +128,10 @@ bool EvictionPolicy::RequireSpace(int64_t size, std::vector* objects_t // up to 20% of the total capacity. int64_t space_to_free = std::max(required_space, PlasmaAllocator::GetFootprintLimit() / 5); - ARROW_LOG(DEBUG) << "not enough space to create this object, so evicting objects"; + RAY_LOG(DEBUG) << "not enough space to create this object, so evicting objects"; // Choose some objects to evict, and update the return pointers. int64_t num_bytes_evicted = ChooseObjectsToEvict(space_to_free, objects_to_evict); - ARROW_LOG(INFO) << "There is not enough space to create this object, so evicting " + RAY_LOG(INFO) << "There is not enough space to create this object, so evicting " << objects_to_evict->size() << " objects to free up " << num_bytes_evicted << " bytes. The number of bytes in use (before " << "this eviction) is " << PlasmaAllocator::Allocated() << "."; diff --git a/src/ray/object_manager/plasma/fling.cc b/src/ray/object_manager/plasma/fling.cc index 8cd3a84b2..cf2ec705b 100644 --- a/src/ray/object_manager/plasma/fling.cc +++ b/src/ray/object_manager/plasma/fling.cc @@ -16,7 +16,7 @@ #include -#include "arrow/util/logging.h" +#include "ray/util/logging.h" #ifdef _WIN32 #include // socklen_t @@ -71,7 +71,7 @@ int send_fd(int conn, int fd) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; } else if (errno == EMSGSIZE) { - ARROW_LOG(WARNING) << "Failed to send file descriptor" + RAY_LOG(WARNING) << "Failed to send file descriptor" << " (errno = EMSGSIZE), retrying."; // If we failed to send the file descriptor, loop until we have sent it // successfully. TODO(rkn): This is problematic for two reasons. First @@ -81,14 +81,14 @@ int send_fd(int conn, int fd) { // plasma store event loop which should never happen. continue; } else { - ARROW_LOG(INFO) << "Error in send_fd (errno = " << errno << ")"; + RAY_LOG(INFO) << "Error in send_fd (errno = " << errno << ")"; return static_cast(r); } } else if (r == 0) { - ARROW_LOG(INFO) << "Encountered unexpected EOF"; + RAY_LOG(INFO) << "Encountered unexpected EOF"; return 0; } else { - ARROW_CHECK(r > 0); + RAY_CHECK(r > 0); return static_cast(r); } } @@ -111,7 +111,7 @@ int recv_fd(int conn) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; } else { - ARROW_LOG(INFO) << "Error in recv_fd (errno = " << errno << ")"; + RAY_LOG(INFO) << "Error in recv_fd (errno = " << errno << ")"; return -1; } } else { diff --git a/src/ray/object_manager/plasma/hash_table_store.cc b/src/ray/object_manager/plasma/hash_table_store.cc deleted file mode 100644 index 251d39db1..000000000 --- a/src/ray/object_manager/plasma/hash_table_store.cc +++ /dev/null @@ -1,58 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include - -#include "arrow/util/logging.h" - -#include "ray/object_manager/plasma/hash_table_store.h" - -namespace plasma { - -Status HashTableStore::Connect(const std::string& endpoint) { return Status::OK(); } - -Status HashTableStore::Put(const std::vector& ids, - const std::vector>& data) { - for (size_t i = 0; i < ids.size(); ++i) { - table_[ids[i]] = data[i]->ToString(); - } - return Status::OK(); -} - -Status HashTableStore::Get(const std::vector& ids, - std::vector> buffers) { - ARROW_CHECK(ids.size() == buffers.size()); - for (size_t i = 0; i < ids.size(); ++i) { - bool valid; - HashTable::iterator result; - { - result = table_.find(ids[i]); - valid = result != table_.end(); - } - if (valid) { - ARROW_CHECK(buffers[i]->size() == static_cast(result->second.size())); - std::memcpy(buffers[i]->mutable_data(), result->second.data(), - result->second.size()); - } - } - return Status::OK(); -} - -REGISTER_EXTERNAL_STORE("hashtable", HashTableStore); - -} // namespace plasma diff --git a/src/ray/object_manager/plasma/hash_table_store.h b/src/ray/object_manager/plasma/hash_table_store.h deleted file mode 100644 index a3e9f082f..000000000 --- a/src/ray/object_manager/plasma/hash_table_store.h +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include - -#include "ray/object_manager/plasma/external_store.h" - -namespace plasma { - -// This is a sample implementation for an external store, for illustration -// purposes only. - -class HashTableStore : public ExternalStore { - public: - HashTableStore() = default; - - Status Connect(const std::string& endpoint) override; - - Status Get(const std::vector& ids, - std::vector> buffers) override; - - Status Put(const std::vector& ids, - const std::vector>& data) override; - - private: - typedef std::unordered_map HashTable; - - HashTable table_; -}; - -} // namespace plasma diff --git a/src/ray/object_manager/plasma/io.cc b/src/ray/object_manager/plasma/io.cc index cc0fdc973..efa8c601a 100644 --- a/src/ray/object_manager/plasma/io.cc +++ b/src/ray/object_manager/plasma/io.cc @@ -58,7 +58,7 @@ Status WriteBytes(int fd, uint8_t* cursor, size_t length) { } else if (nbytes == 0) { return Status::IOError("Encountered unexpected EOF"); } - ARROW_CHECK(nbytes > 0); + RAY_CHECK(nbytes > 0); bytesleft -= nbytes; offset += nbytes; } @@ -89,7 +89,7 @@ Status ReadBytes(int fd, uint8_t* cursor, size_t length) { } else if (0 == nbytes) { return Status::IOError("Encountered unexpected EOF"); } - ARROW_CHECK(nbytes > 0); + RAY_CHECK(nbytes > 0); bytesleft -= nbytes; offset += nbytes; } @@ -101,7 +101,7 @@ Status ReadMessage(int fd, MessageType* type, std::vector* buffer) { int64_t version; RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast(&version), sizeof(version)), *type = MessageType::PlasmaDisconnectClient); - ARROW_CHECK(version == kPlasmaProtocolVersion) << "version = " << version; + RAY_CHECK(version == kPlasmaProtocolVersion) << "version = " << version; RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast(type), sizeof(*type)), *type = MessageType::PlasmaDisconnectClient); int64_t length_temp; @@ -139,18 +139,18 @@ int ConnectOrListenIpcSock(const std::string& pathname, bool shall_listen) { socket_address.in.sin_addr.s_addr = inet_addr(addr.substr(0, i).c_str()); socket_address.in.sin_port = htons(static_cast(atoi(addr.substr(j).c_str()))); if (socket_address.in.sin_addr.s_addr == INADDR_NONE) { - ARROW_LOG(ERROR) << "Socket address is not a valid IPv4 address: " << pathname; + RAY_LOG(ERROR) << "Socket address is not a valid IPv4 address: " << pathname; return -1; } if (socket_address.in.sin_port == htons(0)) { - ARROW_LOG(ERROR) << "Socket address is missing a valid port: " << pathname; + RAY_LOG(ERROR) << "Socket address is missing a valid port: " << pathname; return -1; } } else { addrlen = sizeof(socket_address.un); socket_address.un.sun_family = AF_UNIX; if (pathname.size() + 1 > sizeof(socket_address.un.sun_path)) { - ARROW_LOG(ERROR) << "Socket pathname is too long."; + RAY_LOG(ERROR) << "Socket pathname is too long."; return -1; } strncpy(socket_address.un.sun_path, pathname.c_str(), pathname.size() + 1); @@ -158,7 +158,7 @@ int ConnectOrListenIpcSock(const std::string& pathname, bool shall_listen) { int socket_fd = socket(socket_address.addr.sa_family, SOCK_STREAM, 0); if (socket_fd < 0) { - ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname; + RAY_LOG(ERROR) << "socket() failed for pathname " << pathname; return -1; } if (shall_listen) { @@ -166,7 +166,7 @@ int ConnectOrListenIpcSock(const std::string& pathname, bool shall_listen) { int on = 1; if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&on), sizeof(on)) < 0) { - ARROW_LOG(ERROR) << "setsockopt failed for pathname " << pathname; + RAY_LOG(ERROR) << "setsockopt failed for pathname " << pathname; close(socket_fd); return -1; } @@ -179,13 +179,13 @@ int ConnectOrListenIpcSock(const std::string& pathname, bool shall_listen) { #endif } if (bind(socket_fd, &socket_address.addr, addrlen) != 0) { - ARROW_LOG(ERROR) << "Bind failed for pathname " << pathname; + RAY_LOG(ERROR) << "Bind failed for pathname " << pathname; close(socket_fd); return -1; } if (listen(socket_fd, 128) == -1) { - ARROW_LOG(ERROR) << "Could not listen to socket " << pathname; + RAY_LOG(ERROR) << "Could not listen to socket " << pathname; close(socket_fd); return -1; } @@ -209,7 +209,7 @@ Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries, } *fd = ConnectOrListenIpcSock(pathname, false); while (*fd < 0 && num_retries > 0) { - ARROW_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname + RAY_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname << ", retrying " << num_retries << " more times"; // Sleep for timeout milliseconds. usleep(static_cast(timeout * 1000)); @@ -228,7 +228,7 @@ Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries, int AcceptClient(int socket_fd) { int client_fd = accept(socket_fd, NULL, NULL); if (client_fd < 0) { - ARROW_LOG(ERROR) << "Error reading from socket."; + RAY_LOG(ERROR) << "Error reading from socket."; return -1; } return client_fd; @@ -239,7 +239,7 @@ std::unique_ptr ReadMessageAsync(int sock) { Status s = ReadBytes(sock, reinterpret_cast(&size), sizeof(int64_t)); if (!s.ok()) { // The other side has closed the socket. - ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred."; + RAY_LOG(DEBUG) << "Socket has been closed, or some other error has occurred."; close(sock); return NULL; } @@ -247,7 +247,7 @@ std::unique_ptr ReadMessageAsync(int sock) { s = ReadBytes(sock, message.get(), size); if (!s.ok()) { // The other side has closed the socket. - ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred."; + RAY_LOG(DEBUG) << "Socket has been closed, or some other error has occurred."; close(sock); return NULL; } diff --git a/src/ray/object_manager/plasma/malloc.cc b/src/ray/object_manager/plasma/malloc.cc index 4160d4e9c..95fa7fa55 100644 --- a/src/ray/object_manager/plasma/malloc.cc +++ b/src/ray/object_manager/plasma/malloc.cc @@ -61,7 +61,7 @@ int64_t GetMmapSize(int fd) { return entry.second.size; } } - ARROW_LOG(FATAL) << "failed to find entry in mmap_records for fd " << fd; + RAY_LOG(FATAL) << "failed to find entry in mmap_records for fd " << fd; return -1; // This code is never reached. } diff --git a/src/ray/object_manager/plasma/plasma.cc b/src/ray/object_manager/plasma/plasma.cc index 55c2066cd..b65588b6a 100644 --- a/src/ray/object_manager/plasma/plasma.cc +++ b/src/ray/object_manager/plasma/plasma.cc @@ -38,14 +38,14 @@ int WarnIfSigpipe(int status, int client_sock) { return 0; } if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { - ARROW_LOG(WARNING) << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " + RAY_LOG(WARNING) << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " "sending a message to client on fd " << client_sock << ". The client on the other end may " "have hung up."; return errno; } - ARROW_LOG(FATAL) << "Failed to write message to client on fd " << client_sock << "."; + RAY_LOG(FATAL) << "Failed to write message to client on fd " << client_sock << "."; return -1; // This is never reached. } diff --git a/src/ray/object_manager/plasma/plasma.h b/src/ray/object_manager/plasma/plasma.h index 5dbc502e8..5fdbd5f35 100644 --- a/src/ray/object_manager/plasma/plasma.h +++ b/src/ray/object_manager/plasma/plasma.h @@ -31,12 +31,13 @@ #include #include +#include "ray/common/status.h" #include "ray/object_manager/plasma/compat.h" #include "arrow/status.h" -#include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "ray/object_manager/plasma/common.h" +#include "ray/util/logging.h" #ifdef PLASMA_CUDA using arrow::cuda::CudaIpcMemHandle; @@ -53,7 +54,7 @@ struct ObjectInfoT; Status _s = (s); \ if (!_s.ok()) { \ if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { \ - ARROW_LOG(WARNING) \ + RAY_LOG(WARNING) \ << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " \ "sending a message to client on fd " \ << fd_ \ diff --git a/src/ray/object_manager/plasma/plasma_allocator.cc b/src/ray/object_manager/plasma/plasma_allocator.cc index 82ec4c54b..49e052571 100644 --- a/src/ray/object_manager/plasma/plasma_allocator.cc +++ b/src/ray/object_manager/plasma/plasma_allocator.cc @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include +#include "ray/util/logging.h" #include "ray/object_manager/plasma/malloc.h" #include "ray/object_manager/plasma/plasma_allocator.h" @@ -35,7 +35,7 @@ void* PlasmaAllocator::Memalign(size_t alignment, size_t bytes) { return nullptr; } void* mem = dlmemalign(alignment, bytes); - ARROW_CHECK(mem); + RAY_CHECK(mem); allocated_ += bytes; return mem; } diff --git a/src/ray/object_manager/plasma/protocol.cc b/src/ray/object_manager/plasma/protocol.cc index c308e7b77..8c2872cbe 100644 --- a/src/ray/object_manager/plasma/protocol.cc +++ b/src/ray/object_manager/plasma/protocol.cc @@ -72,7 +72,7 @@ flatbuffers::Offset> ToFlatbuffer( Status PlasmaReceive(int sock, MessageType message_type, std::vector* buffer) { MessageType type; RETURN_NOT_OK(ReadMessage(sock, &type, buffer)); - ARROW_CHECK(type == message_type) + RAY_CHECK(type == message_type) << "type = " << static_cast(type) << ", message_type = " << static_cast(message_type); return Status::OK(); @@ -121,7 +121,7 @@ Status PlasmaErrorStatus(fb::PlasmaError plasma_error) { return MakePlasmaError(PlasmaErrorCode::PlasmaStoreFull, "object does not fit in the plasma store"); default: - ARROW_LOG(FATAL) << "unknown plasma error code " << static_cast(plasma_error); + RAY_LOG(FATAL) << "unknown plasma error code " << static_cast(plasma_error); } return Status::OK(); } @@ -138,9 +138,9 @@ Status SendSetOptionsRequest(int sock, const std::string& client_name, Status ReadSetOptionsRequest(uint8_t* data, size_t size, std::string* client_name, int64_t* output_memory_quota) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *client_name = std::string(message->client_name()->str()); *output_memory_quota = message->output_memory_quota(); return Status::OK(); @@ -153,9 +153,9 @@ Status SendSetOptionsReply(int sock, PlasmaError error) { } Status ReadSetOptionsReply(uint8_t* data, size_t size) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); return PlasmaErrorStatus(message->error()); } @@ -174,9 +174,9 @@ Status SendGetDebugStringReply(int sock, const std::string& debug_string) { } Status ReadGetDebugStringReply(uint8_t* data, size_t size, std::string* debug_string) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *debug_string = message->debug_string()->str(); return Status::OK(); } @@ -195,9 +195,9 @@ Status SendCreateRequest(int sock, ObjectID object_id, bool evict_if_full, Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id, bool* evict_if_full, int64_t* data_size, int64_t* metadata_size, int* device_num) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *evict_if_full = message->evict_if_full(); *data_size = message->data_size(); *metadata_size = message->metadata_size(); @@ -232,7 +232,7 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, #ifdef PLASMA_CUDA crb.add_ipc_handle(ipc_handle); #else - ARROW_LOG(FATAL) << "This should be unreachable."; + RAY_LOG(FATAL) << "This should be unreachable."; #endif } auto message = crb.Finish(); @@ -241,9 +241,9 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, PlasmaObject* object, int* store_fd, int64_t* mmap_size) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); object->store_fd = message->plasma_object()->segment_index(); object->data_offset = message->plasma_object()->data_offset(); @@ -277,9 +277,9 @@ Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_ Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id, bool* evict_if_full, std::string* object_data, std::string* metadata) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); *evict_if_full = message->evict_if_full(); @@ -306,9 +306,9 @@ Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size, bool* evict_if_full, std::vector* object_data, std::vector* metadata) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *evict_if_full = message->evict_if_full(); ConvertToVector(message->object_ids(), object_ids, @@ -332,9 +332,9 @@ Status SendCreateAndSealReply(int sock, PlasmaError error) { } Status ReadCreateAndSealReply(uint8_t* data, size_t size) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); return PlasmaErrorStatus(message->error()); } @@ -346,9 +346,9 @@ Status SendCreateAndSealBatchReply(int sock, PlasmaError error) { } Status ReadCreateAndSealBatchReply(uint8_t* data, size_t size) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); return PlasmaErrorStatus(message->error()); } @@ -359,9 +359,9 @@ Status SendAbortRequest(int sock, ObjectID object_id) { } Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); return Status::OK(); } @@ -373,9 +373,9 @@ Status SendAbortReply(int sock, ObjectID object_id) { } Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); return Status::OK(); } @@ -389,9 +389,9 @@ Status SendSealRequest(int sock, ObjectID object_id) { } Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); return Status::OK(); } @@ -404,9 +404,9 @@ Status SendSealReply(int sock, ObjectID object_id, PlasmaError error) { } Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); return PlasmaErrorStatus(message->error()); } @@ -421,9 +421,9 @@ Status SendReleaseRequest(int sock, ObjectID object_id) { } Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); return Status::OK(); } @@ -436,9 +436,9 @@ Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error) { } Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); return PlasmaErrorStatus(message->error()); } @@ -456,10 +456,10 @@ Status SendDeleteRequest(int sock, const std::vector& object_ids) { Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector* object_ids) { using fb::PlasmaDeleteRequest; - DCHECK(data); - DCHECK(object_ids); + RAY_DCHECK(data); + RAY_DCHECK(object_ids); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); ToVector(*message, object_ids, [](const PlasmaDeleteRequest& request, int i) { return ObjectID::FromBinary(request.object_ids()->Get(i)->str()); }); @@ -468,7 +468,7 @@ Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector* obje Status SendDeleteReply(int sock, const std::vector& object_ids, const std::vector& errors) { - DCHECK(object_ids.size() == errors.size()); + RAY_DCHECK(object_ids.size() == errors.size()); flatbuffers::FlatBufferBuilder fbb; auto message = fb::CreatePlasmaDeleteReply( fbb, static_cast(object_ids.size()), @@ -483,11 +483,11 @@ Status ReadDeleteReply(uint8_t* data, size_t size, std::vector* object std::vector* errors) { using fb::PlasmaDeleteReply; - DCHECK(data); - DCHECK(object_ids); - DCHECK(errors); + RAY_DCHECK(data); + RAY_DCHECK(object_ids); + RAY_DCHECK(errors); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); ToVector(*message, object_ids, [](const PlasmaDeleteReply& request, int i) { return ObjectID::FromBinary(request.object_ids()->Get(i)->str()); }); @@ -507,9 +507,9 @@ Status SendContainsRequest(int sock, ObjectID object_id) { } Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); return Status::OK(); } @@ -523,9 +523,9 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object) { Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id, bool* has_object) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); *has_object = message->has_object(); return Status::OK(); @@ -548,9 +548,9 @@ Status SendConnectReply(int sock, int64_t memory_capacity) { } Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *memory_capacity = message->memory_capacity(); return Status::OK(); } @@ -564,9 +564,9 @@ Status SendEvictRequest(int sock, int64_t num_bytes) { } Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *num_bytes = message->num_bytes(); return Status::OK(); } @@ -578,9 +578,9 @@ Status SendEvictReply(int sock, int64_t num_bytes) { } Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); num_bytes = message->num_bytes(); return Status::OK(); } @@ -597,9 +597,9 @@ Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects, Status ReadGetRequest(uint8_t* data, size_t size, std::vector& object_ids, int64_t* timeout_ms) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { auto object_id = message->object_ids()->Get(i)->str(); object_ids.push_back(ObjectID::FromBinary(object_id)); @@ -642,12 +642,12 @@ Status SendGetReply(int sock, ObjectID object_ids[], Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], PlasmaObject plasma_objects[], int64_t num_objects, std::vector& store_fds, std::vector& mmap_sizes) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); #ifdef PLASMA_CUDA int handle_pos = 0; #endif - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); for (uoffset_t i = 0; i < num_objects; ++i) { object_ids[i] = ObjectID::FromBinary(message->object_ids()->Get(i)->str()); } @@ -668,7 +668,7 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], } #endif } - ARROW_CHECK(message->store_fds()->size() == message->mmap_sizes()->size()); + RAY_CHECK(message->store_fds()->size() == message->mmap_sizes()->size()); for (uoffset_t i = 0; i < message->store_fds()->size(); i++) { store_fds.push_back(message->store_fds()->Get(i)); mmap_sizes.push_back(message->mmap_sizes()->Get(i)); @@ -696,10 +696,10 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address, int* port) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); - DCHECK(message->object_id()->size() == sizeof(ObjectID)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(message->object_id()->size() == sizeof(ObjectID)); *object_id = ObjectID::FromBinary(message->object_id()->str()); #ifdef _WIN32 *address = _strdup(message->address()->c_str()); @@ -720,9 +720,9 @@ Status SendDataReply(int sock, ObjectID object_id, int64_t object_size, Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); *object_id = ObjectID::FromBinary(message->object_id()->str()); *object_size = static_cast(message->object_size()); *metadata_size = static_cast(message->metadata_size()); @@ -742,9 +742,9 @@ Status SendRefreshLRURequest(int sock, const std::vector& object_ids) Status ReadRefreshLRURequest(uint8_t* data, size_t size, std::vector* object_ids) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { auto object_id = message->object_ids()->Get(i)->str(); object_ids->push_back(ObjectID::FromBinary(object_id)); @@ -759,9 +759,9 @@ Status SendRefreshLRUReply(int sock) { } Status ReadRefreshLRUReply(uint8_t* data, size_t size) { - DCHECK(data); + RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); - DCHECK(VerifyFlatbuffer(message, data, size)); + RAY_DCHECK(VerifyFlatbuffer(message, data, size)); return Status::OK(); } diff --git a/src/ray/object_manager/plasma/quota_aware_policy.cc b/src/ray/object_manager/plasma/quota_aware_policy.cc index 67a6cb943..af7acc2fa 100644 --- a/src/ray/object_manager/plasma/quota_aware_policy.cc +++ b/src/ray/object_manager/plasma/quota_aware_policy.cc @@ -47,13 +47,13 @@ void QuotaAwarePolicy::ObjectCreated(const ObjectID& object_id, Client* client, bool QuotaAwarePolicy::SetClientQuota(Client* client, int64_t output_memory_quota) { if (per_client_cache_.find(client) != per_client_cache_.end()) { - ARROW_LOG(WARNING) << "Cannot change the client quota once set"; + RAY_LOG(WARNING) << "Cannot change the client quota once set"; return false; } if (cache_.Capacity() - output_memory_quota < cache_.OriginalCapacity() * kGlobalLruReserveFraction) { - ARROW_LOG(WARNING) << "Not enough memory to set client quota: " << DebugString(); + RAY_LOG(WARNING) << "Not enough memory to set client quota: " << DebugString(); return false; } @@ -72,7 +72,7 @@ bool QuotaAwarePolicy::EnforcePerClientQuota(Client* client, int64_t size, bool auto& client_cache = per_client_cache_[client]; if (size > client_cache->Capacity()) { - ARROW_LOG(WARNING) << "object too large (" << size + RAY_LOG(WARNING) << "object too large (" << size << " bytes) to fit in client quota " << client_cache->Capacity() << " " << DebugString(); return false; diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index d564242c5..76bda0f27 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -180,7 +180,7 @@ uint8_t* PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, int* fd, if (pointer != nullptr) { GetMallocMapinfo(pointer, fd, map_size, offset); - ARROW_CHECK(*fd != -1); + RAY_CHECK(*fd != -1); } return pointer; } @@ -209,7 +209,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f int64_t data_size, int64_t metadata_size, int device_num, Client* client, PlasmaObject* result) { - ARROW_LOG(DEBUG) << "creating object " << object_id.Hex(); + RAY_LOG(DEBUG) << "creating object " << object_id.Hex(); auto entry = GetObjectTableEntry(&store_info_, object_id); if (entry != nullptr) { @@ -228,7 +228,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f pointer = AllocateMemory(total_size, evict_if_full, &fd, &map_size, &offset, client, true); if (!pointer) { - ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.Hex() + RAY_LOG(ERROR) << "Not enough memory to create the object " << object_id.Hex() << ", data_size=" << data_size << ", metadata_size=" << metadata_size << ", will send a reply of PlasmaError::OutOfMemory"; @@ -240,12 +240,12 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f std::shared_ptr<::arrow::cuda::CudaIpcMemHandle> ipc_handle; auto st = AllocateCudaMemory(device_num, total_size, &pointer, &ipc_handle); if (!st.ok()) { - ARROW_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString(); + RAY_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString(); return PlasmaError::OutOfMemory; } result->ipc_handle = ipc_handle; #else - ARROW_LOG(ERROR) << "device_num != 0 but CUDA not enabled"; + RAY_LOG(ERROR) << "device_num != 0 but CUDA not enabled"; return PlasmaError::OutOfMemory; #endif } @@ -284,9 +284,9 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f } void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) { - DCHECK(object != nullptr); - DCHECK(entry != nullptr); - DCHECK(entry->state == ObjectState::PLASMA_SEALED); + RAY_DCHECK(object != nullptr); + RAY_DCHECK(entry != nullptr); + RAY_DCHECK(entry->state == ObjectState::PLASMA_SEALED); #ifdef PLASMA_CUDA if (entry->device_num != 0) { object->ipc_handle = entry->ipc_handle; @@ -321,7 +321,7 @@ void PlasmaStore::RemoveGetRequest(GetRequest* get_request) { } // Remove the get request. if (get_request->timer != -1) { - ARROW_CHECK(loop_->RemoveTimer(get_request->timer) == kEventLoopOk); + RAY_CHECK(loop_->RemoveTimer(get_request->timer) == kEventLoopOk); } delete get_request; } @@ -338,7 +338,7 @@ void PlasmaStore::RemoveGetRequestsForClient(Client* client) { // It shouldn't be possible for a given client to be in the middle of multiple get // requests. - ARROW_CHECK(get_requests_to_remove.size() <= 1); + RAY_CHECK(get_requests_to_remove.size() <= 1); for (GetRequest* get_request : get_requests_to_remove) { RemoveGetRequest(get_request); } @@ -399,7 +399,7 @@ void PlasmaStore::UpdateObjectGetRequests(const ObjectID& object_id) { for (size_t i = 0; i < num_requests; ++i) { auto get_req = get_requests[index]; auto entry = GetObjectTableEntry(&store_info_, object_id); - ARROW_CHECK(entry != nullptr); + RAY_CHECK(entry != nullptr); PlasmaObject_init(&get_req->objects[object_id], entry); get_req->num_satisfied += 1; @@ -447,7 +447,7 @@ void PlasmaStore::ProcessGetRequest(Client* client, AddToClientObjectIds(object_id, entry, client); } else if (entry && entry->state == ObjectState::PLASMA_EVICTED) { // Make sure the object pointer is not already allocated - ARROW_CHECK(!entry->pointer); + RAY_CHECK(!entry->pointer); entry->pointer = AllocateMemory(entry->data_size + entry->metadata_size, /*evict=*/true, @@ -478,7 +478,7 @@ void PlasmaStore::ProcessGetRequest(Client* client, if (!evicted_ids.empty()) { std::vector> buffers; for (size_t i = 0; i < evicted_ids.size(); ++i) { - ARROW_CHECK(evicted_entries[i]->pointer != nullptr); + RAY_CHECK(evicted_entries[i]->pointer != nullptr); buffers.emplace_back(new arrow::MutableBuffer(evicted_entries[i]->pointer, evicted_entries[i]->data_size)); } @@ -550,7 +550,7 @@ void PlasmaStore::EraseFromObjectTable(const ObjectID& object_id) { PlasmaAllocator::Free(object->pointer, buff_size); } else { #ifdef PLASMA_CUDA - ARROW_CHECK_OK(FreeCudaMemory(object->device_num, buff_size, object->pointer)); + RAY_CHECK_OK(FreeCudaMemory(object->device_num, buff_size, object->pointer)); #endif } store_info_.objects.erase(object_id); @@ -558,9 +558,9 @@ void PlasmaStore::EraseFromObjectTable(const ObjectID& object_id) { void PlasmaStore::ReleaseObject(const ObjectID& object_id, Client* client) { auto entry = GetObjectTableEntry(&store_info_, object_id); - ARROW_CHECK(entry != nullptr); + RAY_CHECK(entry != nullptr); // Remove the client from the object's array of clients. - ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1); + RAY_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1); } // Check if an object is present. @@ -575,12 +575,12 @@ ObjectStatus PlasmaStore::ContainsObject(const ObjectID& object_id) { void PlasmaStore::SealObjects(const std::vector& object_ids) { std::vector infos; - ARROW_LOG(DEBUG) << "sealing " << object_ids.size() << " objects"; + RAY_LOG(DEBUG) << "sealing " << object_ids.size() << " objects"; for (size_t i = 0; i < object_ids.size(); ++i) { ObjectInfoT object_info; auto entry = GetObjectTableEntry(&store_info_, object_ids[i]); - ARROW_CHECK(entry != nullptr); - ARROW_CHECK(entry->state == ObjectState::PLASMA_CREATED); + RAY_CHECK(entry != nullptr); + RAY_CHECK(entry->state == ObjectState::PLASMA_CREATED); // Set the state of object to SEALED. entry->state = ObjectState::PLASMA_SEALED; // Set object construction duration. @@ -601,8 +601,8 @@ void PlasmaStore::SealObjects(const std::vector& object_ids) { int PlasmaStore::AbortObject(const ObjectID& object_id, Client* client) { auto entry = GetObjectTableEntry(&store_info_, object_id); - ARROW_CHECK(entry != nullptr) << "To abort an object it must be in the object table."; - ARROW_CHECK(entry->state != ObjectState::PLASMA_SEALED) + RAY_CHECK(entry != nullptr) << "To abort an object it must be in the object table."; + RAY_CHECK(entry->state != ObjectState::PLASMA_SEALED) << "To abort an object it must not have been sealed."; auto it = client->object_ids.find(object_id); if (it == client->object_ids.end()) { @@ -660,15 +660,15 @@ void PlasmaStore::EvictObjects(const std::vector& object_ids) { std::vector> evicted_object_data; std::vector evicted_entries; for (const auto& object_id : object_ids) { - ARROW_LOG(DEBUG) << "evicting object " << object_id.Hex(); + RAY_LOG(DEBUG) << "evicting object " << object_id.Hex(); auto entry = GetObjectTableEntry(&store_info_, object_id); // TODO(rkn): This should probably not fail, but should instead throw an // error. Maybe we should also support deleting objects that have been // created but not sealed. - ARROW_CHECK(entry != nullptr) << "To evict an object it must be in the object table."; - ARROW_CHECK(entry->state == ObjectState::PLASMA_SEALED) + RAY_CHECK(entry != nullptr) << "To evict an object it must be in the object table."; + RAY_CHECK(entry->state == ObjectState::PLASMA_SEALED) << "To evict an object it must have been sealed."; - ARROW_CHECK(entry->ref_count == 0) + RAY_CHECK(entry->ref_count == 0) << "To evict an object, there must be no clients currently using it."; // If there is a backing external store, then mark object for eviction to @@ -691,7 +691,7 @@ void PlasmaStore::EvictObjects(const std::vector& object_ids) { } if (external_store_ && !object_ids.empty()) { - ARROW_CHECK_OK(external_store_->Put(object_ids, evicted_object_data)); + RAY_ARROW_CHECK_OK(external_store_->Put(object_ids, evicted_object_data)); for (auto entry : evicted_entries) { PlasmaAllocator::Free(entry->pointer, entry->data_size + entry->metadata_size); entry->pointer = nullptr; @@ -711,20 +711,20 @@ void PlasmaStore::ConnectClient(int listener_sock) { loop_->AddFileEvent(client_fd, kEventLoopRead, [this, client](int events) { Status s = ProcessMessage(client); if (!s.ok()) { - ARROW_LOG(FATAL) << "Failed to process file event: " << s; + RAY_LOG(FATAL) << "Failed to process file event: " << s; } }); - ARROW_LOG(DEBUG) << "New connection with fd " << client_fd; + RAY_LOG(DEBUG) << "New connection with fd " << client_fd; } void PlasmaStore::DisconnectClient(int client_fd) { - ARROW_CHECK(client_fd > 0); + RAY_CHECK(client_fd > 0); auto it = connected_clients_.find(client_fd); - ARROW_CHECK(it != connected_clients_.end()); + RAY_CHECK(it != connected_clients_.end()); loop_->RemoveFileEvent(client_fd); // Close the socket. close(client_fd); - ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd; + RAY_LOG(INFO) << "Disconnecting client on fd " << client_fd; // Release all the objects that the client was using. auto client = it->second.get(); eviction_policy_.ClientDisconnected(client); @@ -794,10 +794,10 @@ PlasmaStore::NotificationMap::iterator PlasmaStore::SendNotifications( // Attempt to send a notification about this object ID. ssize_t nbytes = send(client_fd, notification.get(), sizeof(int64_t) + size, 0); if (nbytes >= 0) { - ARROW_CHECK(nbytes == static_cast(sizeof(int64_t)) + size); + RAY_CHECK(nbytes == static_cast(sizeof(int64_t)) + size); } else if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { - ARROW_LOG(DEBUG) << "The socket's send buffer is full, so we are caching this " + RAY_LOG(DEBUG) << "The socket's send buffer is full, so we are caching this " "notification and will send it later."; // Add a callback to the event loop to send queued notifications whenever // there is room in the socket's send buffer. Callbacks can be added @@ -810,7 +810,7 @@ PlasmaStore::NotificationMap::iterator PlasmaStore::SendNotifications( }); break; } else { - ARROW_LOG(WARNING) << "Failed to send notification to client on fd " << client_fd; + RAY_LOG(WARNING) << "Failed to send notification to client on fd " << client_fd; if (errno == EPIPE) { closed = true; break; @@ -868,7 +868,7 @@ void PlasmaStore::PushNotification(fb::ObjectInfoT* object_info, int client_fd) // Subscribe to notifications about sealed objects. void PlasmaStore::SubscribeToUpdates(Client* client) { - ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd; + RAY_LOG(DEBUG) << "subscribing to updates on fd " << client->fd; if (client->notification_fd > 0) { // This client has already subscribed. Return. return; @@ -879,7 +879,7 @@ void PlasmaStore::SubscribeToUpdates(Client* client) { int fd = recv_fd(client->fd); if (fd < 0) { // This may mean that the client died before sending the file descriptor. - ARROW_LOG(WARNING) << "Failed to receive file descriptor from client on fd " + RAY_LOG(WARNING) << "Failed to receive file descriptor from client on fd " << client->fd << "."; return; } @@ -903,7 +903,7 @@ void PlasmaStore::SubscribeToUpdates(Client* client) { Status PlasmaStore::ProcessMessage(Client* client) { fb::MessageType type; Status s = ReadMessage(client->fd, &type, &input_buffer_); - ARROW_CHECK(s.ok() || s.IsIOError()); + RAY_CHECK(s.ok() || s.IsIOError()); uint8_t* input = input_buffer_.data(); size_t input_size = input_buffer_.size(); @@ -951,7 +951,7 @@ Status PlasmaStore::ProcessMessage(Client* client) { // If the object was successfully created, fill out the object data and seal it. if (error_code == PlasmaError::OK) { auto entry = GetObjectTableEntry(&store_info_, object_id); - ARROW_CHECK(entry != nullptr); + RAY_CHECK(entry != nullptr); // Write the inlined data and metadata into the allocated object. std::memcpy(entry->pointer, data.data(), data.size()); std::memcpy(entry->pointer + data.size(), metadata.data(), metadata.size()); @@ -960,7 +960,7 @@ Status PlasmaStore::ProcessMessage(Client* client) { // object is not being used by any client. The client was added to the // object's array of clients in CreateObject. This is analogous to the // Release call that happens in the client's Seal method. - ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1); + RAY_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1); } // Reply to the client. @@ -993,7 +993,7 @@ Status PlasmaStore::ProcessMessage(Client* client) { if (error_code == PlasmaError::OK) { for (i = 0; i < object_ids.size(); i++) { auto entry = GetObjectTableEntry(&store_info_, object_ids[i]); - ARROW_CHECK(entry != nullptr); + RAY_CHECK(entry != nullptr); // Write the inlined data and metadata into the allocated object. std::memcpy(entry->pointer, data[i].data(), data[i].size()); std::memcpy(entry->pointer + data[i].size(), metadata[i].data(), @@ -1007,7 +1007,7 @@ Status PlasmaStore::ProcessMessage(Client* client) { // Release call that happens in the client's Seal method. for (i = 0; i < object_ids.size(); i++) { auto entry = GetObjectTableEntry(&store_info_, object_ids[i]); - ARROW_CHECK(RemoveFromClientObjectIds(object_ids[i], entry, client) == 1); + RAY_CHECK(RemoveFromClientObjectIds(object_ids[i], entry, client) == 1); } } else { for (size_t j = 0; j < i; j++) { @@ -1019,7 +1019,7 @@ Status PlasmaStore::ProcessMessage(Client* client) { } break; case fb::MessageType::PlasmaAbortRequest: { RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id)); - ARROW_CHECK(AbortObject(object_id, client) == 1) << "To abort an object, the only " + RAY_CHECK(AbortObject(object_id, client) == 1) << "To abort an object, the only " "client currently using it " "must be the creator."; HANDLE_SIGPIPE(SendAbortReply(client->fd, object_id), client->fd); @@ -1081,7 +1081,7 @@ Status PlasmaStore::ProcessMessage(Client* client) { client->fd); } break; case fb::MessageType::PlasmaDisconnectClient: - ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd; + RAY_LOG(DEBUG) << "Disconnecting client on fd " << client->fd; DisconnectClient(client->fd); break; case fb::MessageType::PlasmaSetOptionsRequest: { @@ -1101,7 +1101,7 @@ Status PlasmaStore::ProcessMessage(Client* client) { } break; default: // This code should be unreachable. - ARROW_CHECK(0); + RAY_CHECK(0); } return Status::OK(); } diff --git a/src/ray/object_manager/plasma/store_runner.cc b/src/ray/object_manager/plasma/store_runner.cc index aec102b2b..1d369e525 100644 --- a/src/ray/object_manager/plasma/store_runner.cc +++ b/src/ray/object_manager/plasma/store_runner.cc @@ -11,31 +11,27 @@ namespace plasma { -using arrow::util::ArrowLog; -using arrow::util::ArrowLogLevel; - void SetMallocGranularity(int value); PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_memory, bool hugepages_enabled, std::string plasma_directory, const std::string external_store_endpoint): hugepages_enabled_(hugepages_enabled), external_store_endpoint_(external_store_endpoint) { - ArrowLog::StartArrowLog("plasma_store", ArrowLogLevel::ARROW_INFO); // Sanity check. if (socket_name.empty()) { - ARROW_LOG(FATAL) << "please specify socket for incoming connections with -s switch"; + RAY_LOG(FATAL) << "please specify socket for incoming connections with -s switch"; } socket_name_ = socket_name; if (system_memory == -1) { - ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch"; + RAY_LOG(FATAL) << "please specify the amount of system memory with -m switch"; } // Set system memory capacity PlasmaAllocator::SetFootprintLimit(static_cast(system_memory)); - ARROW_LOG(INFO) << "Allowing the Plasma store to use up to " + RAY_LOG(INFO) << "Allowing the Plasma store to use up to " << static_cast(system_memory) / 1000000000 << "GB of memory."; if (hugepages_enabled && plasma_directory.empty()) { - ARROW_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages " + RAY_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages " "filesystem with -d"; } if (plasma_directory.empty()) { @@ -45,7 +41,7 @@ PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_mem plasma_directory = "/tmp"; #endif } - ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory + RAY_LOG(INFO) << "Starting object store with directory " << plasma_directory << " and huge page support " << (hugepages_enabled ? "enabled" : "disabled"); #ifdef __linux__ @@ -62,7 +58,7 @@ PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_mem // Keep some safety margin for allocator fragmentation. shm_mem_avail = 9 * shm_mem_avail / 10; if (system_memory > shm_mem_avail) { - ARROW_LOG(WARNING) + RAY_LOG(WARNING) << "System memory request exceeds memory available in " << plasma_directory << ". The request is for " << system_memory << " bytes, and the amount available is " << shm_mem_avail @@ -88,16 +84,16 @@ void PlasmaStoreRunner::Start() { std::shared_ptr external_store{nullptr}; if (!external_store_endpoint_.empty()) { std::string name; - ARROW_CHECK_OK( + RAY_ARROW_CHECK_OK( plasma::ExternalStores::ExtractStoreName(external_store_endpoint_, &name)); external_store = plasma::ExternalStores::GetStore(name); if (external_store == nullptr) { - ARROW_LOG(FATAL) << "No such external store \"" << name << "\""; + RAY_LOG(FATAL) << "No such external store \"" << name << "\""; } - ARROW_LOG(DEBUG) << "connecting to external store..."; - ARROW_CHECK_OK(external_store->Connect(external_store_endpoint_)); + RAY_LOG(DEBUG) << "connecting to external store..."; + RAY_ARROW_CHECK_OK(external_store->Connect(external_store_endpoint_)); } - ARROW_LOG(DEBUG) << "starting server listening on " << socket_name_; + RAY_LOG(DEBUG) << "starting server listening on " << socket_name_; // Create the event loop. loop_.reset(new EventLoop); @@ -111,7 +107,7 @@ void PlasmaStoreRunner::Start() { // bookkeeping. void* pointer = PlasmaAllocator::Memalign( kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); - ARROW_CHECK(pointer != nullptr); + RAY_CHECK(pointer != nullptr); // This will unmap the file, but the next one created will be as large // as this one (this is an implementation detail of dlmalloc). PlasmaAllocator::Free( @@ -119,7 +115,7 @@ void PlasmaStoreRunner::Start() { int socket = ConnectOrListenIpcSock(socket_name_, true); // TODO(pcm): Check return value. - ARROW_CHECK(socket >= 0); + RAY_CHECK(socket >= 0); loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) { this->store_->ConnectClient(socket); @@ -130,14 +126,13 @@ void PlasmaStoreRunner::Start() { #ifdef _WINSOCKAPI_ WSACleanup(); #endif - ArrowLog::ShutDownArrowLog(); } void PlasmaStoreRunner::Stop() { if (loop_) { loop_->Stop(); } else { - ARROW_LOG(ERROR) << "Expected loop_ to be non-NULL; this may be a bug"; + RAY_LOG(ERROR) << "Expected loop_ to be non-NULL; this may be a bug"; } } diff --git a/src/ray/object_manager/plasma/test/client_tests.cc b/src/ray/object_manager/plasma/test/client_tests.cc deleted file mode 100644 index a0672f7b4..000000000 --- a/src/ray/object_manager/plasma/test/client_tests.cc +++ /dev/null @@ -1,1084 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include -#include -#include -#include - -#include -#include - -#include - -#include "arrow/testing/gtest_util.h" -#include "arrow/util/io_util.h" - -#include "ray/object_manager/plasma/client.h" -#include "ray/object_manager/plasma/common.h" -#include "ray/object_manager/plasma/plasma.h" -#include "ray/object_manager/plasma/protocol.h" -#include "ray/object_manager/plasma/test_util.h" - -namespace plasma { - -using arrow::internal::TemporaryDir; - -std::string test_executable; // NOLINT - -void AssertObjectBufferEqual(const ObjectBuffer& object_buffer, - const std::vector& metadata, - const std::vector& data) { - arrow::AssertBufferEqual(*object_buffer.metadata, metadata); - arrow::AssertBufferEqual(*object_buffer.data, data); -} - -class TestPlasmaStore : public ::testing::Test { - public: - // TODO(pcm): At the moment, stdout of the test gets mixed up with - // stdout of the object store. Consider changing that. - - void SetUp() { - ASSERT_OK_AND_ASSIGN(temp_dir_, TemporaryDir::Make("cli-test-")); - store_socket_name_ = temp_dir_->path().ToString() + "store"; - - std::string plasma_directory = - test_executable.substr(0, test_executable.find_last_of("/")); - std::string plasma_command = - plasma_directory + "/plasma-store-server -m 10000000 -s " + store_socket_name_ + - " 1> /dev/null 2> /dev/null & " + "echo $! > " + store_socket_name_ + ".pid"; - PLASMA_CHECK_SYSTEM(system(plasma_command.c_str())); - ARROW_CHECK_OK(client_.Connect(store_socket_name_, "")); - ARROW_CHECK_OK(client2_.Connect(store_socket_name_, "")); - } - - virtual void TearDown() { - ARROW_CHECK_OK(client_.Disconnect()); - ARROW_CHECK_OK(client2_.Disconnect()); - // Kill plasma_store process that we started -#ifdef COVERAGE_BUILD - // Ask plasma_store to exit gracefully and give it time to write out - // coverage files - std::string plasma_term_command = - "kill -TERM `cat " + store_socket_name_ + ".pid` || exit 0"; - PLASMA_CHECK_SYSTEM(system(plasma_term_command.c_str())); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); -#endif - std::string plasma_kill_command = - "kill -KILL `cat " + store_socket_name_ + ".pid` || exit 0"; - PLASMA_CHECK_SYSTEM(system(plasma_kill_command.c_str())); - } - - void CreateObject(PlasmaClient& client, const ObjectID& object_id, - const std::vector& metadata, - const std::vector& data, bool release = true) { - std::shared_ptr data_buffer; - ARROW_CHECK_OK(client.Create(object_id, data.size(), metadata.data(), metadata.size(), - &data_buffer)); - for (size_t i = 0; i < data.size(); i++) { - data_buffer->mutable_data()[i] = data[i]; - } - ARROW_CHECK_OK(client.Seal(object_id)); - if (release) { - ARROW_CHECK_OK(client.Release(object_id)); - } - } - - protected: - PlasmaClient client_; - PlasmaClient client2_; - std::unique_ptr temp_dir_; - std::string store_socket_name_; -}; - -TEST_F(TestPlasmaStore, NewSubscriberTest) { - PlasmaClient local_client, local_client2; - - ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); - ARROW_CHECK_OK(local_client2.Connect(store_socket_name_, "")); - - ObjectID object_id = random_object_id(); - - // Test for the object being in local Plasma store. - // First create object. - int64_t data_size = 100; - uint8_t metadata[] = {5}; - int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; - ARROW_CHECK_OK( - local_client.Create(object_id, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(local_client.Seal(object_id)); - - // Test that new subscriber client2 can receive notifications about existing objects. - int fd = -1; - ARROW_CHECK_OK(local_client2.Subscribe(&fd)); - ASSERT_GT(fd, 0); - - ObjectID object_id2 = random_object_id(); - int64_t data_size2 = 0; - int64_t metadata_size2 = 0; - ARROW_CHECK_OK( - local_client2.GetNotification(fd, &object_id2, &data_size2, &metadata_size2)); - ASSERT_EQ(object_id, object_id2); - ASSERT_EQ(data_size, data_size2); - ASSERT_EQ(metadata_size, metadata_size2); - - // Delete the object. - ARROW_CHECK_OK(local_client.Release(object_id)); - ARROW_CHECK_OK(local_client.Delete(object_id)); - - ARROW_CHECK_OK( - local_client2.GetNotification(fd, &object_id2, &data_size2, &metadata_size2)); - ASSERT_EQ(object_id, object_id2); - ASSERT_EQ(-1, data_size2); - ASSERT_EQ(-1, metadata_size2); - - ARROW_CHECK_OK(local_client2.Disconnect()); - ARROW_CHECK_OK(local_client.Disconnect()); -} - -TEST_F(TestPlasmaStore, BatchNotificationTest) { - PlasmaClient local_client, local_client2; - - ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); - ARROW_CHECK_OK(local_client2.Connect(store_socket_name_, "")); - - int fd = -1; - ARROW_CHECK_OK(local_client2.Subscribe(&fd)); - ASSERT_GT(fd, 0); - - ObjectID object_id1 = random_object_id(); - ObjectID object_id2 = random_object_id(); - - std::vector object_ids = {object_id1, object_id2}; - - std::vector data = {"hello", "world!"}; - std::vector metadata = {"1", "23"}; - ARROW_CHECK_OK(local_client.CreateAndSealBatch(object_ids, data, metadata)); - - ObjectID object_id = random_object_id(); - int64_t data_size = 0; - int64_t metadata_size = 0; - ARROW_CHECK_OK( - local_client2.GetNotification(fd, &object_id, &data_size, &metadata_size)); - ASSERT_EQ(object_id, object_id1); - ASSERT_EQ(data_size, 5); - ASSERT_EQ(metadata_size, 1); - - ARROW_CHECK_OK( - local_client2.GetNotification(fd, &object_id, &data_size, &metadata_size)); - ASSERT_EQ(object_id, object_id2); - ASSERT_EQ(data_size, 6); - ASSERT_EQ(metadata_size, 2); - - ARROW_CHECK_OK(local_client2.Disconnect()); - ARROW_CHECK_OK(local_client.Disconnect()); -} - -TEST_F(TestPlasmaStore, SealErrorsTest) { - ObjectID object_id = random_object_id(); - - Status result = client_.Seal(object_id); - ASSERT_TRUE(IsPlasmaObjectNonexistent(result)); - - // Create object. - std::vector data(100, 0); - CreateObject(client_, object_id, {42}, data, false); - - // Trying to seal it again. - result = client_.Seal(object_id); - ASSERT_TRUE(IsPlasmaObjectAlreadySealed(result)); - ARROW_CHECK_OK(client_.Release(object_id)); -} - -TEST_F(TestPlasmaStore, SetQuotaBasicTest) { - bool has_object = false; - ObjectID id1 = random_object_id(); - ObjectID id2 = random_object_id(); - - ARROW_CHECK_OK(client_.SetClientOptions("client1", 5 * 1024 * 1024)); - std::vector big_data(3 * 1024 * 1024, 0); - - // First object fits - CreateObject(client_, id1, {42}, big_data, true); - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); - ASSERT_TRUE(has_object); - - // Evicts first object - CreateObject(client_, id2, {42}, big_data, true); - ARROW_CHECK_OK(client_.Contains(id2, &has_object)); - ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); - ASSERT_FALSE(has_object); - - // Too big to fit in quota at all - std::shared_ptr data_buffer; - ASSERT_FALSE( - client_.Create(random_object_id(), 7 * 1024 * 1024, {}, 0, &data_buffer).ok()); - ASSERT_TRUE( - client_.Create(random_object_id(), 4 * 1024 * 1024, {}, 0, &data_buffer).ok()); -} - -TEST_F(TestPlasmaStore, SetQuotaProvidesIsolationFromOtherClients) { - bool has_object = false; - ObjectID id1 = random_object_id(); - ObjectID id2 = random_object_id(); - - std::vector big_data(3 * 1024 * 1024, 0); - - // First object, created without quota - CreateObject(client_, id1, {42}, big_data, true); - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); - ASSERT_TRUE(has_object); - - // Second client creates a bunch of objects - for (int i = 0; i < 10; i++) { - CreateObject(client2_, random_object_id(), {42}, big_data, true); - } - - // First client's object is evicted - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); - ASSERT_FALSE(has_object); - - // Try again with quota enabled - ARROW_CHECK_OK(client_.SetClientOptions("client1", 5 * 1024 * 1024)); - CreateObject(client_, id2, {42}, big_data, true); - ARROW_CHECK_OK(client_.Contains(id2, &has_object)); - ASSERT_TRUE(has_object); - - // Second client creates a bunch of objects - for (int i = 0; i < 10; i++) { - CreateObject(client2_, random_object_id(), {42}, big_data, true); - } - - // First client's object is not evicted - ARROW_CHECK_OK(client_.Contains(id2, &has_object)); - ASSERT_TRUE(has_object); -} - -TEST_F(TestPlasmaStore, SetQuotaProtectsOtherClients) { - bool has_object = false; - ObjectID id1 = random_object_id(); - - std::vector big_data(3 * 1024 * 1024, 0); - - // First client has no quota - CreateObject(client_, id1, {42}, big_data, true); - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); - ASSERT_TRUE(has_object); - - // Second client creates a bunch of objects under a quota - ARROW_CHECK_OK(client2_.SetClientOptions("client2", 5 * 1024 * 1024)); - for (int i = 0; i < 10; i++) { - CreateObject(client2_, random_object_id(), {42}, big_data, true); - } - - // First client's object is NOT evicted - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); - ASSERT_TRUE(has_object); -} - -TEST_F(TestPlasmaStore, SetQuotaCannotExceedSeventyPercentMemory) { - ASSERT_FALSE(client_.SetClientOptions("client1", 8 * 1024 * 1024).ok()); - ASSERT_TRUE(client_.SetClientOptions("client1", 5 * 1024 * 1024).ok()); - // cannot set quota twice - ASSERT_FALSE(client_.SetClientOptions("client1", 5 * 1024 * 1024).ok()); - // cannot exceed 70% summed - ASSERT_FALSE(client2_.SetClientOptions("client2", 3 * 1024 * 1024).ok()); - ASSERT_TRUE(client2_.SetClientOptions("client2", 1 * 1024 * 1024).ok()); -} - -TEST_F(TestPlasmaStore, SetQuotaDemotesPinnedObjectsToGlobalLRU) { - bool has_object = false; - ASSERT_TRUE(client_.SetClientOptions("client1", 5 * 1024 * 1024).ok()); - - ObjectID id1 = random_object_id(); - ObjectID id2 = random_object_id(); - std::vector big_data(3 * 1024 * 1024, 0); - - // Quota is not enough to fit both id1 and id2, but global LRU is - CreateObject(client_, id1, {42}, big_data, false); - CreateObject(client_, id2, {42}, big_data, false); - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); - ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client_.Contains(id2, &has_object)); - ASSERT_TRUE(has_object); - - // Release both objects. Now id1 is in global LRU and id2 is in quota - ARROW_CHECK_OK(client_.Release(id1)); - ARROW_CHECK_OK(client_.Release(id2)); - - // This flushes id1 from the object store - for (int i = 0; i < 10; i++) { - CreateObject(client2_, random_object_id(), {42}, big_data, true); - } - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); - ASSERT_FALSE(has_object); - ARROW_CHECK_OK(client_.Contains(id2, &has_object)); - ASSERT_TRUE(has_object); -} - -TEST_F(TestPlasmaStore, SetQuotaDemoteDisconnectToGlobalLRU) { - bool has_object = false; - PlasmaClient local_client; - ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); - ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024)); - - ObjectID id1 = random_object_id(); - std::vector big_data(3 * 1024 * 1024, 0); - - // First object fits - CreateObject(local_client, id1, {42}, big_data, true); - for (int i = 0; i < 10; i++) { - CreateObject(client_, random_object_id(), {42}, big_data, true); - } - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); - ASSERT_TRUE(has_object); - - // Object is still present after disconnect - ARROW_CHECK_OK(local_client.Disconnect()); - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); - ASSERT_TRUE(has_object); - - // But is eligible for global LRU - for (int i = 0; i < 10; i++) { - CreateObject(client_, random_object_id(), {42}, big_data, true); - } - ARROW_CHECK_OK(client_.Contains(id1, &has_object)); - ASSERT_FALSE(has_object); -} - -TEST_F(TestPlasmaStore, SetQuotaCleanupObjectMetadata) { - PlasmaClient local_client; - ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); - ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024)); - - ObjectID id0 = random_object_id(); - ObjectID id1 = random_object_id(); - ObjectID id2 = random_object_id(); - ObjectID id3 = random_object_id(); - std::vector big_data(3 * 1024 * 1024, 0); - std::vector small_data(1 * 1024 * 1024, 0); - CreateObject(local_client, id0, {42}, small_data, false); - CreateObject(local_client, id1, {42}, big_data, true); - CreateObject(local_client, id2, {42}, big_data, - true); // spills id0 to global, evicts id1 - CreateObject(local_client, id3, {42}, small_data, false); - - ASSERT_TRUE(client_.DebugString().find("num clients with quota: 1") != - std::string::npos); - ASSERT_TRUE(client_.DebugString().find("quota map size: 2") != std::string::npos); - ASSERT_TRUE(client_.DebugString().find("pinned quota map size: 1") != - std::string::npos); - ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 0") != - std::string::npos); - ASSERT_TRUE(client_.DebugString().find("(local) num objects: 2") != std::string::npos); - - // release id0 - ARROW_CHECK_OK(local_client.Release(id0)); - ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 1") != - std::string::npos); - - // delete everything - ARROW_CHECK_OK(local_client.Delete(id0)); - ARROW_CHECK_OK(local_client.Delete(id2)); - ARROW_CHECK_OK(local_client.Delete(id3)); - ARROW_CHECK_OK(local_client.Release(id3)); - ASSERT_TRUE(client_.DebugString().find("quota map size: 0") != std::string::npos); - ASSERT_TRUE(client_.DebugString().find("pinned quota map size: 0") != - std::string::npos); - ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 0") != - std::string::npos); - ASSERT_TRUE(client_.DebugString().find("(local) num objects: 0") != std::string::npos); - - ARROW_CHECK_OK(local_client.Disconnect()); - int tries = 10; // wait for disconnect to complete - while (tries > 0 && - client_.DebugString().find("num clients with quota: 0") == std::string::npos) { - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - tries -= 1; - } - ASSERT_TRUE(client_.DebugString().find("num clients with quota: 0") != - std::string::npos); - ASSERT_TRUE(client_.DebugString().find("(global lru) capacity: 10000000") != - std::string::npos); - ASSERT_TRUE(client_.DebugString().find("(global lru) used: 0%") != std::string::npos); -} - -TEST_F(TestPlasmaStore, SetQuotaCleanupClientDisconnect) { - PlasmaClient local_client; - ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); - ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024)); - - ObjectID id1 = random_object_id(); - ObjectID id2 = random_object_id(); - ObjectID id3 = random_object_id(); - std::vector big_data(3 * 1024 * 1024, 0); - std::vector small_data(1 * 1024 * 1024, 0); - CreateObject(local_client, id1, {42}, big_data, true); - CreateObject(local_client, id2, {42}, big_data, true); - CreateObject(local_client, id3, {42}, small_data, false); - - ARROW_CHECK_OK(local_client.Disconnect()); - int tries = 10; // wait for disconnect to complete - while (tries > 0 && - client_.DebugString().find("num clients with quota: 0") == std::string::npos) { - std::this_thread::sleep_for(std::chrono::milliseconds(200)); - tries -= 1; - } - ASSERT_TRUE(client_.DebugString().find("num clients with quota: 0") != - std::string::npos); - ASSERT_TRUE(client_.DebugString().find("quota map size: 0") != std::string::npos); - ASSERT_TRUE(client_.DebugString().find("pinned quota map size: 0") != - std::string::npos); - ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 2") != - std::string::npos); - ASSERT_TRUE(client_.DebugString().find("(global lru) capacity: 10000000") != - std::string::npos); - ASSERT_TRUE(client_.DebugString().find("(global lru) used: 41.9431%") != - std::string::npos); -} - -TEST_F(TestPlasmaStore, RefreshLRUTest) { - bool has_object = false; - std::vector object_ids; - - for (int i = 0; i < 10; ++i) { - object_ids.push_back(random_object_id()); - } - - std::vector small_data(1 * 1000 * 1000, 0); - - // we can fit ten small objects into the store - for (const auto& object_id : object_ids) { - CreateObject(client_, object_id, {}, small_data, true); - ARROW_CHECK_OK(client_.Contains(object_ids[0], &has_object)); - ASSERT_TRUE(has_object); - } - - ObjectID id = random_object_id(); - CreateObject(client_, id, {}, small_data, true); - - // the first two objects got evicted (20% of the store) - ARROW_CHECK_OK(client_.Contains(object_ids[0], &has_object)); - ASSERT_FALSE(has_object); - - ARROW_CHECK_OK(client_.Contains(object_ids[1], &has_object)); - ASSERT_FALSE(has_object); - - ARROW_CHECK_OK(client_.Refresh({object_ids[2], object_ids[3]})); - - id = random_object_id(); - CreateObject(client_, id, {}, small_data, true); - id = random_object_id(); - CreateObject(client_, id, {}, small_data, true); - - // the refreshed objects are not evicted - ARROW_CHECK_OK(client_.Contains(object_ids[2], &has_object)); - ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client_.Contains(object_ids[3], &has_object)); - ASSERT_TRUE(has_object); - - // the next object in LRU order is evicted - ARROW_CHECK_OK(client_.Contains(object_ids[4], &has_object)); - ASSERT_FALSE(has_object); -} - -TEST_F(TestPlasmaStore, DeleteTest) { - ObjectID object_id = random_object_id(); - - // Test for deleting non-existence object. - Status result = client_.Delete(object_id); - ARROW_CHECK_OK(result); - - // Test for the object being in local Plasma store. - // First create object. - int64_t data_size = 100; - uint8_t metadata[] = {5}; - int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(client_.Seal(object_id)); - - result = client_.Delete(object_id); - ARROW_CHECK_OK(result); - bool has_object = false; - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); - ASSERT_TRUE(has_object); - - ARROW_CHECK_OK(client_.Release(object_id)); - // object_id is marked as to-be-deleted, when it is not in use, it will be deleted. - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); - ASSERT_FALSE(has_object); - ARROW_CHECK_OK(client_.Delete(object_id)); -} - -TEST_F(TestPlasmaStore, DeleteObjectsTest) { - ObjectID object_id1 = random_object_id(); - ObjectID object_id2 = random_object_id(); - - // Test for deleting non-existence object. - Status result = client_.Delete(std::vector{object_id1, object_id2}); - ARROW_CHECK_OK(result); - // Test for the object being in local Plasma store. - // First create object. - int64_t data_size = 100; - uint8_t metadata[] = {5}; - int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(client_.Seal(object_id1)); - ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(client_.Seal(object_id2)); - // Release the ref count of Create function. - ARROW_CHECK_OK(client_.Release(object_id1)); - ARROW_CHECK_OK(client_.Release(object_id2)); - // Increase the ref count by calling Get using client2_. - std::vector object_buffers; - ARROW_CHECK_OK(client2_.Get({object_id1, object_id2}, 0, &object_buffers)); - // Objects are still used by client2_. - result = client_.Delete(std::vector{object_id1, object_id2}); - ARROW_CHECK_OK(result); - // The object is used and it should not be deleted right now. - bool has_object = false; - ARROW_CHECK_OK(client_.Contains(object_id1, &has_object)); - ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client_.Contains(object_id2, &has_object)); - ASSERT_TRUE(has_object); - // Decrease the ref count by deleting the PlasmaBuffer (in ObjectBuffer). - // client2_ won't send the release request immediately because the trigger - // condition is not reached. The release is only added to release cache. - object_buffers.clear(); - // Delete the objects. - result = client2_.Delete(std::vector{object_id1, object_id2}); - ARROW_CHECK_OK(client_.Contains(object_id1, &has_object)); - ASSERT_FALSE(has_object); - ARROW_CHECK_OK(client_.Contains(object_id2, &has_object)); - ASSERT_FALSE(has_object); -} - -TEST_F(TestPlasmaStore, ContainsTest) { - ObjectID object_id = random_object_id(); - - // Test for object non-existence. - bool has_object; - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); - ASSERT_FALSE(has_object); - - // Test for the object being in local Plasma store. - // First create object. - std::vector data(100, 0); - CreateObject(client_, object_id, {42}, data); - std::vector object_buffers; - ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); - ASSERT_TRUE(has_object); -} - -TEST_F(TestPlasmaStore, GetTest) { - std::vector object_buffers; - - ObjectID object_id = random_object_id(); - - // Test for object non-existence. - ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers)); - ASSERT_EQ(object_buffers.size(), 1); - ASSERT_FALSE(object_buffers[0].metadata); - ASSERT_FALSE(object_buffers[0].data); - EXPECT_FALSE(client_.IsInUse(object_id)); - - // Test for the object being in local Plasma store. - // First create object. - std::vector data = {3, 5, 6, 7, 9}; - CreateObject(client_, object_id, {42}, data); - EXPECT_FALSE(client_.IsInUse(object_id)); - - object_buffers.clear(); - ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); - ASSERT_EQ(object_buffers.size(), 1); - ASSERT_EQ(object_buffers[0].device_num, 0); - AssertObjectBufferEqual(object_buffers[0], {42}, {3, 5, 6, 7, 9}); - - // Metadata keeps object in use - { - auto metadata = object_buffers[0].metadata; - object_buffers.clear(); - ::arrow::AssertBufferEqual(*metadata, std::string{42}); - EXPECT_TRUE(client_.IsInUse(object_id)); - } - // Object is automatically released - EXPECT_FALSE(client_.IsInUse(object_id)); -} - -TEST_F(TestPlasmaStore, LegacyGetTest) { - // Test for old non-releasing Get() variant - ObjectID object_id = random_object_id(); - { - ObjectBuffer object_buffer; - - // Test for object non-existence. - ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer)); - ASSERT_FALSE(object_buffer.metadata); - ASSERT_FALSE(object_buffer.data); - EXPECT_FALSE(client_.IsInUse(object_id)); - - // First create object. - std::vector data = {3, 5, 6, 7, 9}; - CreateObject(client_, object_id, {42}, data); - EXPECT_FALSE(client_.IsInUse(object_id)); - - ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer)); - AssertObjectBufferEqual(object_buffer, {42}, {3, 5, 6, 7, 9}); - } - // Object needs releasing manually - EXPECT_TRUE(client_.IsInUse(object_id)); - ARROW_CHECK_OK(client_.Release(object_id)); - EXPECT_FALSE(client_.IsInUse(object_id)); -} - -TEST_F(TestPlasmaStore, MultipleGetTest) { - ObjectID object_id1 = random_object_id(); - ObjectID object_id2 = random_object_id(); - std::vector object_ids = {object_id1, object_id2}; - std::vector object_buffers; - - int64_t data_size = 4; - uint8_t metadata[] = {5}; - int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data)); - data->mutable_data()[0] = 1; - ARROW_CHECK_OK(client_.Seal(object_id1)); - - ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data)); - data->mutable_data()[0] = 2; - ARROW_CHECK_OK(client_.Seal(object_id2)); - - ARROW_CHECK_OK(client_.Get(object_ids, -1, &object_buffers)); - ASSERT_EQ(object_buffers[0].data->data()[0], 1); - ASSERT_EQ(object_buffers[1].data->data()[0], 2); -} - -TEST_F(TestPlasmaStore, BatchCreateTest) { - ObjectID object_id1 = random_object_id(); - ObjectID object_id2 = random_object_id(); - std::vector object_ids = {object_id1, object_id2}; - - std::vector data = {"hello", "world"}; - std::vector metadata = {"1", "2"}; - - ARROW_CHECK_OK(client_.CreateAndSealBatch(object_ids, data, metadata)); - - std::vector object_buffers; - - ARROW_CHECK_OK(client_.Get(object_ids, -1, &object_buffers)); - - std::string out1, out2; - out1.assign(reinterpret_cast(object_buffers[0].data->data()), - object_buffers[0].data->size()); - out2.assign(reinterpret_cast(object_buffers[1].data->data()), - object_buffers[1].data->size()); - - ASSERT_STREQ(out1.c_str(), "hello"); - ASSERT_STREQ(out2.c_str(), "world"); -} - -TEST_F(TestPlasmaStore, AbortTest) { - ObjectID object_id = random_object_id(); - std::vector object_buffers; - - // Test for object non-existence. - ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers)); - ASSERT_FALSE(object_buffers[0].data); - - // Test object abort. - // First create object. - int64_t data_size = 4; - uint8_t metadata[] = {5}; - int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; - uint8_t* data_ptr; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); - data_ptr = data->mutable_data(); - // Write some data. - for (int64_t i = 0; i < data_size / 2; i++) { - data_ptr[i] = static_cast(i % 4); - } - // Attempt to abort. Test that this fails before the first release. - Status status = client_.Abort(object_id); - ASSERT_TRUE(status.IsInvalid()); - // Release, then abort. - ARROW_CHECK_OK(client_.Release(object_id)); - EXPECT_TRUE(client_.IsInUse(object_id)); - - ARROW_CHECK_OK(client_.Abort(object_id)); - EXPECT_FALSE(client_.IsInUse(object_id)); - - // Test for object non-existence after the abort. - ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers)); - ASSERT_FALSE(object_buffers[0].data); - - // Create the object successfully this time. - CreateObject(client_, object_id, {42, 43}, {1, 2, 3, 4, 5}); - - // Test that we can get the object. - ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); - AssertObjectBufferEqual(object_buffers[0], {42, 43}, {1, 2, 3, 4, 5}); -} - -TEST_F(TestPlasmaStore, OneIdCreateRepeatedlyTest) { - const int64_t loop_times = 5; - - ObjectID object_id = random_object_id(); - std::vector object_buffers; - - // Test for object non-existence. - ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers)); - ASSERT_FALSE(object_buffers[0].data); - - int64_t data_size = 20; - uint8_t metadata[] = {5}; - int64_t metadata_size = sizeof(metadata); - - // Test the sequence: create -> release -> abort -> ... - for (int64_t i = 0; i < loop_times; i++) { - std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(client_.Release(object_id)); - ARROW_CHECK_OK(client_.Abort(object_id)); - } - - // Test the sequence: create -> seal -> release -> delete -> ... - for (int64_t i = 0; i < loop_times; i++) { - std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(client_.Seal(object_id)); - ARROW_CHECK_OK(client_.Release(object_id)); - ARROW_CHECK_OK(client_.Delete(object_id)); - } -} - -TEST_F(TestPlasmaStore, MultipleClientTest) { - ObjectID object_id = random_object_id(); - std::vector object_buffers; - - // Test for object non-existence on the first client. - bool has_object; - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); - ASSERT_FALSE(has_object); - - // Test for the object being in local Plasma store. - // First create and seal object on the second client. - int64_t data_size = 100; - uint8_t metadata[] = {5}; - int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; - ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data)); - ARROW_CHECK_OK(client2_.Seal(object_id)); - // Test that the first client can get the object. - ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); - ASSERT_TRUE(object_buffers[0].data); - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); - ASSERT_TRUE(has_object); - - // Test that one client disconnecting does not interfere with the other. - // First create object on the second client. - object_id = random_object_id(); - ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data)); - // Disconnect the first client. - ARROW_CHECK_OK(client_.Disconnect()); - // Test that the second client can seal and get the created object. - ARROW_CHECK_OK(client2_.Seal(object_id)); - ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers)); - ASSERT_TRUE(object_buffers[0].data); - ARROW_CHECK_OK(client2_.Contains(object_id, &has_object)); - ASSERT_TRUE(has_object); -} - -TEST_F(TestPlasmaStore, ManyObjectTest) { - // Create many objects on the first client. Seal one third, abort one third, - // and leave the last third unsealed. - std::vector object_ids; - for (int i = 0; i < 100; i++) { - ObjectID object_id = random_object_id(); - object_ids.push_back(object_id); - - // Test for object non-existence on the first client. - bool has_object; - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); - ASSERT_FALSE(has_object); - - // Test for the object being in local Plasma store. - // First create and seal object on the first client. - int64_t data_size = 100; - uint8_t metadata[] = {5}; - int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); - - if (i % 3 == 0) { - // Seal one third of the objects. - ARROW_CHECK_OK(client_.Seal(object_id)); - // Test that the first client can get the object. - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); - ASSERT_TRUE(has_object); - } else if (i % 3 == 1) { - // Abort one third of the objects. - ARROW_CHECK_OK(client_.Release(object_id)); - ARROW_CHECK_OK(client_.Abort(object_id)); - } - } - // Disconnect the first client. All unsealed objects should be aborted. - ARROW_CHECK_OK(client_.Disconnect()); - - // Check that the second client can query the object store for the first - // client's objects. - int i = 0; - for (auto const& object_id : object_ids) { - bool has_object; - ARROW_CHECK_OK(client2_.Contains(object_id, &has_object)); - if (i % 3 == 0) { - // The first third should be sealed. - ASSERT_TRUE(has_object); - } else { - // The rest were aborted, so the object is not in the store. - ASSERT_FALSE(has_object); - } - i++; - } -} - -#ifdef PLASMA_CUDA -using arrow::cuda::CudaBuffer; -using arrow::cuda::CudaBufferReader; -using arrow::cuda::CudaBufferWriter; - -// actual CUDA device number + 1 -constexpr int kGpuDeviceNumber = 1; - -namespace { - -void AssertCudaRead(const std::shared_ptr& buffer, - const std::vector& expected_data) { - std::shared_ptr gpu_buffer; - const size_t data_size = expected_data.size(); - - ASSERT_OK_AND_ASSIGN(gpu_buffer, CudaBuffer::FromBuffer(buffer)); - ASSERT_EQ(gpu_buffer->size(), data_size); - - CudaBufferReader reader(gpu_buffer); - std::vector read_data(data_size); - ASSERT_OK_AND_EQ(data_size, reader.Read(data_size, read_data.data())); - - for (size_t i = 0; i < data_size; i++) { - ASSERT_EQ(read_data[i], expected_data[i]); - } -} - -} // namespace - -TEST_F(TestPlasmaStore, GetGPUTest) { - ObjectID object_id = random_object_id(); - std::vector object_buffers; - - // Test for object non-existence. - ARROW_CHECK_OK(client_.Get({object_id}, 0, &object_buffers)); - ASSERT_EQ(object_buffers.size(), 1); - ASSERT_FALSE(object_buffers[0].data); - - // Test for the object being in local Plasma store. - // First create object. - uint8_t data[] = {4, 5, 3, 1}; - int64_t data_size = sizeof(data); - uint8_t metadata[] = {42}; - int64_t metadata_size = sizeof(metadata); - std::shared_ptr data_buffer; - std::shared_ptr gpu_buffer; - ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, - &data_buffer, kGpuDeviceNumber)); - ASSERT_OK_AND_ASSIGN(gpu_buffer, CudaBuffer::FromBuffer(data_buffer)); - CudaBufferWriter writer(gpu_buffer); - ARROW_CHECK_OK(writer.Write(data, data_size)); - ARROW_CHECK_OK(client_.Seal(object_id)); - - object_buffers.clear(); - ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); - ASSERT_EQ(object_buffers.size(), 1); - ASSERT_EQ(object_buffers[0].device_num, kGpuDeviceNumber); - // Check data - AssertCudaRead(object_buffers[0].data, {4, 5, 3, 1}); - // Check metadata - AssertCudaRead(object_buffers[0].metadata, {42}); -} - -TEST_F(TestPlasmaStore, DeleteObjectsGPUTest) { - ObjectID object_id1 = random_object_id(); - ObjectID object_id2 = random_object_id(); - - // Test for deleting non-existence object. - Status result = client_.Delete(std::vector{object_id1, object_id2}); - ARROW_CHECK_OK(result); - // Test for the object being in local Plasma store. - // First create object. - int64_t data_size = 100; - uint8_t metadata[] = {5}; - int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data, - kGpuDeviceNumber)); - ARROW_CHECK_OK(client_.Seal(object_id1)); - ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data, - kGpuDeviceNumber)); - ARROW_CHECK_OK(client_.Seal(object_id2)); - // Release the ref count of Create function. - ARROW_CHECK_OK(client_.Release(object_id1)); - ARROW_CHECK_OK(client_.Release(object_id2)); - // Increase the ref count by calling Get using client2_. - std::vector object_buffers; - ARROW_CHECK_OK(client2_.Get({object_id1, object_id2}, 0, &object_buffers)); - // Objects are still used by client2_. - result = client_.Delete(std::vector{object_id1, object_id2}); - ARROW_CHECK_OK(result); - // The object is used and it should not be deleted right now. - bool has_object = false; - ARROW_CHECK_OK(client_.Contains(object_id1, &has_object)); - ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client_.Contains(object_id2, &has_object)); - ASSERT_TRUE(has_object); - // Decrease the ref count by deleting the PlasmaBuffer (in ObjectBuffer). - // client2_ won't send the release request immediately because the trigger - // condition is not reached. The release is only added to release cache. - object_buffers.clear(); - // Delete the objects. - result = client2_.Delete(std::vector{object_id1, object_id2}); - ARROW_CHECK_OK(client_.Contains(object_id1, &has_object)); - ASSERT_FALSE(has_object); - ARROW_CHECK_OK(client_.Contains(object_id2, &has_object)); - ASSERT_FALSE(has_object); -} - -TEST_F(TestPlasmaStore, RepeatlyCreateGPUTest) { - const int64_t loop_times = 100; - const int64_t object_num = 5; - const int64_t data_size = 40; - - std::vector object_ids; - - // create new gpu objects - for (int64_t i = 0; i < object_num; i++) { - object_ids.push_back(random_object_id()); - ObjectID& object_id = object_ids[i]; - - std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber)); - ARROW_CHECK_OK(client_.Seal(object_id)); - ARROW_CHECK_OK(client_.Release(object_id)); - } - - // delete and create again - for (int64_t i = 0; i < loop_times; i++) { - ObjectID& object_id = object_ids[i % object_num]; - - ARROW_CHECK_OK(client_.Delete(object_id)); - - std::shared_ptr data; - ARROW_CHECK_OK(client_.Create(object_id, data_size, 0, 0, &data, kGpuDeviceNumber)); - ARROW_CHECK_OK(client_.Seal(object_id)); - ARROW_CHECK_OK(client_.Release(object_id)); - } - - // delete all - ARROW_CHECK_OK(client_.Delete(object_ids)); -} - -TEST_F(TestPlasmaStore, GPUBufferLifetime) { - // ARROW-5924: GPU buffer is allowed to persist after Release() - ObjectID object_id = random_object_id(); - const int64_t data_size = 40; - - std::shared_ptr create_buff; - ARROW_CHECK_OK( - client_.Create(object_id, data_size, nullptr, 0, &create_buff, kGpuDeviceNumber)); - ARROW_CHECK_OK(client_.Seal(object_id)); - ARROW_CHECK_OK(client_.Release(object_id)); - - ObjectBuffer get_buff_1; - ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &get_buff_1)); - ObjectBuffer get_buff_2; - ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &get_buff_2)); - ARROW_CHECK_OK(client_.Release(object_id)); - ARROW_CHECK_OK(client_.Release(object_id)); - - ObjectBuffer get_buff_3; - ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &get_buff_3)); - ARROW_CHECK_OK(client_.Release(object_id)); - - ARROW_CHECK_OK(client_.Delete(object_id)); -} - -TEST_F(TestPlasmaStore, MultipleClientGPUTest) { - ObjectID object_id = random_object_id(); - std::vector object_buffers; - - // Test for object non-existence on the first client. - bool has_object; - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); - ASSERT_FALSE(has_object); - - // Test for the object being in local Plasma store. - // First create and seal object on the second client. - int64_t data_size = 100; - uint8_t metadata[] = {5}; - int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; - ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data, - kGpuDeviceNumber)); - ARROW_CHECK_OK(client2_.Seal(object_id)); - // Test that the first client can get the object. - ARROW_CHECK_OK(client_.Get({object_id}, -1, &object_buffers)); - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); - ASSERT_TRUE(has_object); - - // Test that one client disconnecting does not interfere with the other. - // First create object on the second client. - object_id = random_object_id(); - ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data, - kGpuDeviceNumber)); - // Disconnect the first client. - ARROW_CHECK_OK(client_.Disconnect()); - // Test that the second client can seal and get the created object. - ARROW_CHECK_OK(client2_.Seal(object_id)); - object_buffers.clear(); - ARROW_CHECK_OK(client2_.Contains(object_id, &has_object)); - ASSERT_TRUE(has_object); - ARROW_CHECK_OK(client2_.Get({object_id}, -1, &object_buffers)); - ASSERT_EQ(object_buffers.size(), 1); - ASSERT_EQ(object_buffers[0].device_num, kGpuDeviceNumber); - AssertCudaRead(object_buffers[0].metadata, {5}); -} - -#endif // PLASMA_CUDA - -} // namespace plasma - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - plasma::test_executable = std::string(argv[0]); - return RUN_ALL_TESTS(); -} diff --git a/src/ray/object_manager/plasma/test/external_store_tests.cc b/src/ray/object_manager/plasma/test/external_store_tests.cc deleted file mode 100644 index 2f7a0eacf..000000000 --- a/src/ray/object_manager/plasma/test/external_store_tests.cc +++ /dev/null @@ -1,143 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include -#include -#include -#include - -#include -#include - -#include - -#include "arrow/testing/gtest_util.h" -#include "arrow/util/io_util.h" - -#include "ray/object_manager/plasma/client.h" -#include "ray/object_manager/plasma/common.h" -#include "ray/object_manager/plasma/external_store.h" -#include "ray/object_manager/plasma/plasma.h" -#include "ray/object_manager/plasma/protocol.h" -#include "ray/object_manager/plasma/test_util.h" - -namespace plasma { - -using arrow::internal::TemporaryDir; - -std::string external_test_executable; // NOLINT - -void AssertObjectBufferEqual(const ObjectBuffer& object_buffer, - const std::string& metadata, const std::string& data) { - arrow::AssertBufferEqual(*object_buffer.metadata, metadata); - arrow::AssertBufferEqual(*object_buffer.data, data); -} - -class TestPlasmaStoreWithExternal : public ::testing::Test { - public: - // TODO(pcm): At the moment, stdout of the test gets mixed up with - // stdout of the object store. Consider changing that. - void SetUp() override { - ASSERT_OK_AND_ASSIGN(temp_dir_, TemporaryDir::Make("ext-test-")); - store_socket_name_ = temp_dir_->path().ToString() + "store"; - - std::string plasma_directory = - external_test_executable.substr(0, external_test_executable.find_last_of('/')); - std::string plasma_command = plasma_directory + - "/plasma-store-server -m 1024000 -e " + - "hashtable://test -s " + store_socket_name_ + - " 1> /tmp/log.stdout 2> /tmp/log.stderr & " + - "echo $! > " + store_socket_name_ + ".pid"; - PLASMA_CHECK_SYSTEM(system(plasma_command.c_str())); - ARROW_CHECK_OK(client_.Connect(store_socket_name_, "")); - } - - void TearDown() override { - ARROW_CHECK_OK(client_.Disconnect()); - // Kill plasma_store process that we started -#ifdef COVERAGE_BUILD - // Ask plasma_store to exit gracefully and give it time to write out - // coverage files - std::string plasma_term_command = - "kill -TERM `cat " + store_socket_name_ + ".pid` || exit 0"; - PLASMA_CHECK_SYSTEM(system(plasma_term_command.c_str())); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); -#endif - std::string plasma_kill_command = - "kill -KILL `cat " + store_socket_name_ + ".pid` || exit 0"; - PLASMA_CHECK_SYSTEM(system(plasma_kill_command.c_str())); - } - - protected: - PlasmaClient client_; - std::unique_ptr temp_dir_; - std::string store_socket_name_; -}; - -TEST_F(TestPlasmaStoreWithExternal, EvictionTest) { - std::vector object_ids; - std::string data(100 * 1024, 'x'); - std::string metadata; - for (int i = 0; i < 20; i++) { - ObjectID object_id = random_object_id(); - object_ids.push_back(object_id); - - // Test for object non-existence. - bool has_object; - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); - ASSERT_FALSE(has_object); - - // Test for the object being in local Plasma store. - // Create and seal the object. - ARROW_CHECK_OK(client_.CreateAndSeal(object_id, data, metadata)); - // Test that the client can get the object. - ARROW_CHECK_OK(client_.Contains(object_id, &has_object)); - ASSERT_TRUE(has_object); - } - - for (int i = 0; i < 20; i++) { - // Since we are accessing objects sequentially, every object we - // access would be a cache "miss" owing to LRU eviction. - // Try and access the object from the plasma store first, and then try - // external store on failure. This should succeed to fetch the object. - // However, it may evict the next few objects. - std::vector object_buffers; - ARROW_CHECK_OK(client_.Get({object_ids[i]}, -1, &object_buffers)); - ASSERT_EQ(object_buffers.size(), 1); - ASSERT_EQ(object_buffers[0].device_num, 0); - ASSERT_TRUE(object_buffers[0].data); - AssertObjectBufferEqual(object_buffers[0], metadata, data); - } - - // Make sure we still cannot fetch objects that do not exist - std::vector object_buffers; - ARROW_CHECK_OK(client_.Get({random_object_id()}, 100, &object_buffers)); - ASSERT_EQ(object_buffers.size(), 1); - ASSERT_EQ(object_buffers[0].device_num, 0); - ASSERT_EQ(object_buffers[0].data, nullptr); - ASSERT_EQ(object_buffers[0].metadata, nullptr); -} - -} // namespace plasma - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - plasma::external_test_executable = std::string(argv[0]); - return RUN_ALL_TESTS(); -} diff --git a/src/ray/object_manager/plasma/test/serialization_tests.cc b/src/ray/object_manager/plasma/test/serialization_tests.cc deleted file mode 100644 index d1fcf0a3e..000000000 --- a/src/ray/object_manager/plasma/test/serialization_tests.cc +++ /dev/null @@ -1,333 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include - -#include -#include - -#include - -#include "arrow/testing/gtest_util.h" -#include "arrow/util/io_util.h" - -#include "ray/object_manager/plasma/common.h" -#include "ray/object_manager/plasma/io.h" -#include "ray/object_manager/plasma/plasma.h" -#include "ray/object_manager/plasma/protocol.h" -#include "ray/object_manager/plasma/test_util.h" - -namespace fb = plasma::flatbuf; - -namespace plasma { - -using arrow::internal::TemporaryDir; - -/** - * Seek to the beginning of a file and read a message from it. - * - * \param fd File descriptor of the file. - * \param message_type Message type that we expect in the file. - * - * \return Pointer to the content of the message. Needs to be freed by the - * caller. - */ -std::vector read_message_from_file(int fd, MessageType message_type) { - /* Go to the beginning of the file. */ - lseek(fd, 0, SEEK_SET); - MessageType type; - std::vector data; - Status s = ReadMessage(fd, &type, &data); - DCHECK_OK(s); - DCHECK_EQ(type, message_type); - return data; -} - -PlasmaObject random_plasma_object(void) { - unsigned int seed = static_cast(time(NULL)); - int random = rand_r(&seed); - PlasmaObject object = {}; - object.store_fd = random + 7; - object.data_offset = random + 1; - object.metadata_offset = random + 2; - object.data_size = random + 3; - object.metadata_size = random + 4; - object.device_num = 0; - return object; -} - -class TestPlasmaSerialization : public ::testing::Test { - public: - void SetUp() { ASSERT_OK_AND_ASSIGN(temp_dir_, TemporaryDir::Make("ser-test-")); } - - // Create a temporary file. - // A fd is returned which must be closed manually. The file itself - // is deleted at the end of the test. - int CreateTemporaryFile(void) { - char path[1024]; - - std::stringstream ss; - ss << temp_dir_->path().ToString() << "fileXXXXXX"; - strncpy(path, ss.str().c_str(), sizeof(path)); - ARROW_LOG(INFO) << "file path: '" << path << "'"; - return mkstemp(path); - } - - protected: - std::unique_ptr temp_dir_; -}; - -TEST_F(TestPlasmaSerialization, CreateRequest) { - int fd = CreateTemporaryFile(); - ObjectID object_id1 = random_object_id(); - int64_t data_size1 = 42; - int64_t metadata_size1 = 11; - int device_num1 = 0; - ASSERT_OK(SendCreateRequest(fd, object_id1, /*evict_if_full=*/true, data_size1, - metadata_size1, device_num1)); - std::vector data = - read_message_from_file(fd, MessageType::PlasmaCreateRequest); - ObjectID object_id2; - bool evict_if_full; - int64_t data_size2; - int64_t metadata_size2; - int device_num2; - ASSERT_OK(ReadCreateRequest(data.data(), data.size(), &object_id2, &evict_if_full, - &data_size2, &metadata_size2, &device_num2)); - ASSERT_TRUE(evict_if_full); - ASSERT_EQ(data_size1, data_size2); - ASSERT_EQ(metadata_size1, metadata_size2); - ASSERT_EQ(object_id1, object_id2); - ASSERT_EQ(device_num1, device_num2); - close(fd); -} - -TEST_F(TestPlasmaSerialization, CreateReply) { - int fd = CreateTemporaryFile(); - ObjectID object_id1 = random_object_id(); - PlasmaObject object1 = random_plasma_object(); - int64_t mmap_size1 = 1000000; - ASSERT_OK(SendCreateReply(fd, object_id1, &object1, PlasmaError::OK, mmap_size1)); - std::vector data = read_message_from_file(fd, MessageType::PlasmaCreateReply); - ObjectID object_id2; - PlasmaObject object2 = {}; - int store_fd; - int64_t mmap_size2; - ASSERT_OK(ReadCreateReply(data.data(), data.size(), &object_id2, &object2, &store_fd, - &mmap_size2)); - ASSERT_EQ(object_id1, object_id2); - ASSERT_EQ(object1.store_fd, store_fd); - ASSERT_EQ(mmap_size1, mmap_size2); - ASSERT_EQ(memcmp(&object1, &object2, sizeof(object1)), 0); - close(fd); -} - -TEST_F(TestPlasmaSerialization, SealRequest) { - int fd = CreateTemporaryFile(); - ObjectID object_id1 = random_object_id(); - std::string digest1 = std::string(kDigestSize, 7); - ASSERT_OK(SendSealRequest(fd, object_id1, digest1)); - std::vector data = read_message_from_file(fd, MessageType::PlasmaSealRequest); - ObjectID object_id2; - std::string digest2; - ASSERT_OK(ReadSealRequest(data.data(), data.size(), &object_id2, &digest2)); - ASSERT_EQ(object_id1, object_id2); - ASSERT_EQ(memcmp(digest1.data(), digest2.data(), kDigestSize), 0); - close(fd); -} - -TEST_F(TestPlasmaSerialization, SealReply) { - int fd = CreateTemporaryFile(); - ObjectID object_id1 = random_object_id(); - ASSERT_OK(SendSealReply(fd, object_id1, PlasmaError::ObjectExists)); - std::vector data = read_message_from_file(fd, MessageType::PlasmaSealReply); - ObjectID object_id2; - Status s = ReadSealReply(data.data(), data.size(), &object_id2); - ASSERT_EQ(object_id1, object_id2); - ASSERT_TRUE(IsPlasmaObjectExists(s)); - close(fd); -} - -TEST_F(TestPlasmaSerialization, GetRequest) { - int fd = CreateTemporaryFile(); - ObjectID object_ids[2]; - object_ids[0] = random_object_id(); - object_ids[1] = random_object_id(); - int64_t timeout_ms = 1234; - ASSERT_OK(SendGetRequest(fd, object_ids, 2, timeout_ms)); - std::vector data = read_message_from_file(fd, MessageType::PlasmaGetRequest); - std::vector object_ids_return; - int64_t timeout_ms_return; - ASSERT_OK( - ReadGetRequest(data.data(), data.size(), object_ids_return, &timeout_ms_return)); - ASSERT_EQ(object_ids[0], object_ids_return[0]); - ASSERT_EQ(object_ids[1], object_ids_return[1]); - ASSERT_EQ(timeout_ms, timeout_ms_return); - close(fd); -} - -TEST_F(TestPlasmaSerialization, GetReply) { - int fd = CreateTemporaryFile(); - ObjectID object_ids[2]; - object_ids[0] = random_object_id(); - object_ids[1] = random_object_id(); - std::unordered_map plasma_objects; - plasma_objects[object_ids[0]] = random_plasma_object(); - plasma_objects[object_ids[1]] = random_plasma_object(); - std::vector store_fds = {1, 2, 3}; - std::vector mmap_sizes = {100, 200, 300}; - ASSERT_OK(SendGetReply(fd, object_ids, plasma_objects, 2, store_fds, mmap_sizes)); - - std::vector data = read_message_from_file(fd, MessageType::PlasmaGetReply); - ObjectID object_ids_return[2]; - PlasmaObject plasma_objects_return[2]; - std::vector store_fds_return; - std::vector mmap_sizes_return; - memset(&plasma_objects_return, 0, sizeof(plasma_objects_return)); - ASSERT_OK(ReadGetReply(data.data(), data.size(), object_ids_return, - &plasma_objects_return[0], 2, store_fds_return, - mmap_sizes_return)); - - ASSERT_EQ(object_ids[0], object_ids_return[0]); - ASSERT_EQ(object_ids[1], object_ids_return[1]); - - PlasmaObject po, po2; - for (int i = 0; i < 2; ++i) { - po = plasma_objects[object_ids[i]]; - po2 = plasma_objects_return[i]; - ASSERT_EQ(po, po2); - } - ASSERT_TRUE(store_fds == store_fds_return); - ASSERT_TRUE(mmap_sizes == mmap_sizes_return); - close(fd); -} - -TEST_F(TestPlasmaSerialization, ReleaseRequest) { - int fd = CreateTemporaryFile(); - ObjectID object_id1 = random_object_id(); - ASSERT_OK(SendReleaseRequest(fd, object_id1)); - std::vector data = - read_message_from_file(fd, MessageType::PlasmaReleaseRequest); - ObjectID object_id2; - ASSERT_OK(ReadReleaseRequest(data.data(), data.size(), &object_id2)); - ASSERT_EQ(object_id1, object_id2); - close(fd); -} - -TEST_F(TestPlasmaSerialization, ReleaseReply) { - int fd = CreateTemporaryFile(); - ObjectID object_id1 = random_object_id(); - ASSERT_OK(SendReleaseReply(fd, object_id1, PlasmaError::ObjectExists)); - std::vector data = read_message_from_file(fd, MessageType::PlasmaReleaseReply); - ObjectID object_id2; - Status s = ReadReleaseReply(data.data(), data.size(), &object_id2); - ASSERT_EQ(object_id1, object_id2); - ASSERT_TRUE(IsPlasmaObjectExists(s)); - close(fd); -} - -TEST_F(TestPlasmaSerialization, DeleteRequest) { - int fd = CreateTemporaryFile(); - ObjectID object_id1 = random_object_id(); - ASSERT_OK(SendDeleteRequest(fd, std::vector{object_id1})); - std::vector data = - read_message_from_file(fd, MessageType::PlasmaDeleteRequest); - std::vector object_vec; - ASSERT_OK(ReadDeleteRequest(data.data(), data.size(), &object_vec)); - ASSERT_EQ(object_vec.size(), 1); - ASSERT_EQ(object_id1, object_vec[0]); - close(fd); -} - -TEST_F(TestPlasmaSerialization, DeleteReply) { - int fd = CreateTemporaryFile(); - ObjectID object_id1 = random_object_id(); - PlasmaError error1 = PlasmaError::ObjectExists; - ASSERT_OK(SendDeleteReply(fd, std::vector{object_id1}, - std::vector{error1})); - std::vector data = read_message_from_file(fd, MessageType::PlasmaDeleteReply); - std::vector object_vec; - std::vector error_vec; - Status s = ReadDeleteReply(data.data(), data.size(), &object_vec, &error_vec); - ASSERT_EQ(object_vec.size(), 1); - ASSERT_EQ(object_id1, object_vec[0]); - ASSERT_EQ(error_vec.size(), 1); - ASSERT_TRUE(error_vec[0] == PlasmaError::ObjectExists); - ASSERT_TRUE(s.ok()); - close(fd); -} - -TEST_F(TestPlasmaSerialization, EvictRequest) { - int fd = CreateTemporaryFile(); - int64_t num_bytes = 111; - ASSERT_OK(SendEvictRequest(fd, num_bytes)); - std::vector data = read_message_from_file(fd, MessageType::PlasmaEvictRequest); - int64_t num_bytes_received; - ASSERT_OK(ReadEvictRequest(data.data(), data.size(), &num_bytes_received)); - ASSERT_EQ(num_bytes, num_bytes_received); - close(fd); -} - -TEST_F(TestPlasmaSerialization, EvictReply) { - int fd = CreateTemporaryFile(); - int64_t num_bytes = 111; - ASSERT_OK(SendEvictReply(fd, num_bytes)); - std::vector data = read_message_from_file(fd, MessageType::PlasmaEvictReply); - int64_t num_bytes_received; - ASSERT_OK(ReadEvictReply(data.data(), data.size(), num_bytes_received)); - ASSERT_EQ(num_bytes, num_bytes_received); - close(fd); -} - -TEST_F(TestPlasmaSerialization, DataRequest) { - int fd = CreateTemporaryFile(); - ObjectID object_id1 = random_object_id(); - const char* address1 = "address1"; - int port1 = 12345; - ASSERT_OK(SendDataRequest(fd, object_id1, address1, port1)); - /* Reading message back. */ - std::vector data = read_message_from_file(fd, MessageType::PlasmaDataRequest); - ObjectID object_id2; - char* address2; - int port2; - ASSERT_OK(ReadDataRequest(data.data(), data.size(), &object_id2, &address2, &port2)); - ASSERT_EQ(object_id1, object_id2); - ASSERT_EQ(strcmp(address1, address2), 0); - ASSERT_EQ(port1, port2); - free(address2); - close(fd); -} - -TEST_F(TestPlasmaSerialization, DataReply) { - int fd = CreateTemporaryFile(); - ObjectID object_id1 = random_object_id(); - int64_t object_size1 = 146; - int64_t metadata_size1 = 198; - ASSERT_OK(SendDataReply(fd, object_id1, object_size1, metadata_size1)); - /* Reading message back. */ - std::vector data = read_message_from_file(fd, MessageType::PlasmaDataReply); - ObjectID object_id2; - int64_t object_size2; - int64_t metadata_size2; - ASSERT_OK(ReadDataReply(data.data(), data.size(), &object_id2, &object_size2, - &metadata_size2)); - ASSERT_EQ(object_id1, object_id2); - ASSERT_EQ(object_size1, object_size2); - ASSERT_EQ(metadata_size1, metadata_size2); -} - -} // namespace plasma diff --git a/src/ray/object_manager/plasma/test_util.h b/src/ray/object_manager/plasma/test_util.h deleted file mode 100644 index 1151b3fbd..000000000 --- a/src/ray/object_manager/plasma/test_util.h +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include - -#include "ray/object_manager/plasma/common.h" - -namespace plasma { - -ObjectID random_object_id() { - static uint32_t random_seed = 0; - std::mt19937 gen(random_seed++); - std::uniform_int_distribution d(0, std::numeric_limits::max()); - ObjectID result; - uint8_t* data = result.mutable_data(); - std::generate(data, data + kUniqueIDSize, - [&d, &gen] { return static_cast(d(gen)); }); - return result; -} - -#define PLASMA_CHECK_SYSTEM(expr) \ - do { \ - int status__ = (expr); \ - EXPECT_TRUE(WIFEXITED(status__)); \ - EXPECT_EQ(WEXITSTATUS(status__), 0); \ - } while (false); - -} // namespace plasma diff --git a/src/ray/plasma/store_exec.cc b/src/ray/plasma/store_exec.cc index 3b689c6e6..d1bd5d9c6 100644 --- a/src/ray/plasma/store_exec.cc +++ b/src/ray/plasma/store_exec.cc @@ -12,16 +12,20 @@ #define __STDC_FORMAT_MACROS #endif -using arrow::util::ArrowLog; - void HandleSignal(int signal) { if (signal == SIGTERM) { - ARROW_LOG(INFO) << "SIGTERM Signal received, closing Plasma Server..."; + RAY_LOG(INFO) << "SIGTERM Signal received, closing Plasma Server..."; plasma::plasma_store_runner->Stop(); } } int main(int argc, char *argv[]) { + InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog, + ray::RayLog::ShutDownRayLog, argv[0], + ray::RayLogLevel::INFO, + /*log_dir=*/""); + ray::RayLog::InstallFailureSignalHandler(); + std::string socket_name; // Directory where plasma memory mapped files are stored. std::string plasma_directory; @@ -47,7 +51,7 @@ int main(int argc, char *argv[]) { case 'm': { char extra; int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra); - ARROW_CHECK(scanned == 1); + RAY_CHECK(scanned == 1); break; } case 'z': { @@ -60,7 +64,6 @@ int main(int argc, char *argv[]) { } if (!keep_idle) { - ArrowLog::InstallFailureSignalHandler(); plasma::plasma_store_runner.reset( new plasma::PlasmaStoreRunner(socket_name, system_memory, hugepages_enabled, plasma_directory, external_store_endpoint)); @@ -73,11 +76,9 @@ int main(int argc, char *argv[]) { signal(SIGTERM, HandleSignal); plasma::plasma_store_runner->Start(); plasma::plasma_store_runner.reset(); - ArrowLog::UninstallSignalAction(); } else { - printf( - "The Plasma Store is started with the '-z' flag, " - "and it will run idle as a placeholder."); + RAY_LOG(INFO) << "The Plasma Store is started with the '-z' flag, " + << "and it will run idle as a placeholder."; while (true) { std::this_thread::sleep_for(std::chrono::hours(1000)); }