Fix ray arrow logs (#9097)

* convert arrow logs to ray logs

* remove extra plasma tests and modules
This commit is contained in:
Siyuan (Ryans) Zhuang
2020-06-23 10:02:30 -07:00
committed by GitHub
parent cf0894d396
commit 306ca75737
20 changed files with 221 additions and 1938 deletions
+41 -41
View File
@@ -181,14 +181,14 @@ class ClientMmapTableEntry {
pointer_ = reinterpret_cast<uint8_t*>(MapViewOfFile(reinterpret_cast<HANDLE>(fh_get(fd)), FILE_MAP_ALL_ACCESS, 0, 0, length_));
// TODO(pcm): Don't fail here, instead return a Status.
if (pointer_ == NULL) {
ARROW_LOG(FATAL) << "mmap failed";
RAY_LOG(FATAL) << "mmap failed";
}
#else
pointer_ = reinterpret_cast<uint8_t*>(
mmap(NULL, length_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
// TODO(pcm): Don't fail here, instead return a Status.
if (pointer_ == MAP_FAILED) {
ARROW_LOG(FATAL) << "mmap failed";
RAY_LOG(FATAL) << "mmap failed";
}
#endif
close(fd); // Closing this fd has an effect on performance.
@@ -207,7 +207,7 @@ class ClientMmapTableEntry {
r = munmap(pointer_, length_);
#endif
if (r != 0) {
ARROW_LOG(ERROR) << "munmap returned " << r << ", errno = " << errno;
RAY_LOG(ERROR) << "munmap returned " << r << ", errno = " << errno;
}
}
@@ -382,7 +382,7 @@ uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_
// process before.
uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) {
auto entry = mmap_table_.find(store_fd_val);
ARROW_CHECK(entry != mmap_table_.end());
RAY_CHECK(entry != mmap_table_.end());
return entry->second->pointer();
}
@@ -397,7 +397,7 @@ int PlasmaClient::Impl::GetStoreFd(int store_fd) {
auto entry = mmap_table_.find(store_fd);
if (entry == mmap_table_.end()) {
int fd = recv_fd(store_conn_);
ARROW_CHECK(fd >= 0) << "recv not successful";
RAY_CHECK(fd >= 0) << "recv not successful";
return fd;
} else {
return entry->second->fd();
@@ -421,7 +421,7 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
object_entry = objects_in_use_[object_id].get();
} else {
object_entry = elem->second.get();
ARROW_CHECK(object_entry->count > 0);
RAY_CHECK(object_entry->count > 0);
}
// Increment the count of the number of instances of this object that are
// being used by this client. The corresponding decrement should happen in
@@ -435,7 +435,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
bool evict_if_full) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
RAY_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
<< data_size << " and metadata size " << metadata_size;
RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, evict_if_full, data_size,
metadata_size, device_num));
@@ -451,10 +451,10 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
// descriptor.
if (device_num == 0) {
int fd = GetStoreFd(store_fd);
ARROW_CHECK(object.data_size == data_size);
ARROW_CHECK(object.metadata_size == metadata_size);
RAY_CHECK(object.data_size == data_size);
RAY_CHECK(object.metadata_size == metadata_size);
// The metadata should come right after the data.
ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
RAY_CHECK(object.metadata_offset == object.data_offset + data_size);
*data = std::make_shared<PlasmaMutableBuffer>(
shared_from_this(), LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset,
data_size);
@@ -483,7 +483,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
}
*data = MakeBufferFromGpuProcessHandle(handle);
#else
ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
RAY_LOG(FATAL) << "Arrow GPU library is not enabled.";
#endif
}
@@ -505,7 +505,7 @@ Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
bool evict_if_full) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
RAY_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, evict_if_full, data,
metadata));
@@ -522,7 +522,7 @@ Status PlasmaClient::Impl::CreateAndSealBatch(const std::vector<ObjectID>& objec
bool evict_if_full) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
ARROW_LOG(DEBUG) << "called CreateAndSealBatch on conn " << store_conn_;
RAY_LOG(DEBUG) << "called CreateAndSealBatch on conn " << store_conn_;
RETURN_NOT_OK(SendCreateAndSealBatchRequest(store_conn_, object_ids, evict_if_full,
data, metadata));
@@ -551,9 +551,9 @@ Status PlasmaClient::Impl::GetBuffers(
// This client created the object but hasn't sealed it. If we call Get
// with no timeout, we will deadlock, because this client won't be able to
// call Seal.
ARROW_CHECK(timeout_ms != -1)
RAY_CHECK(timeout_ms != -1)
<< "Plasma client called get on an unsealed object that it created";
ARROW_LOG(WARNING)
RAY_LOG(WARNING)
<< "Attempting to get an object that this client created but hasn't sealed.";
all_present = false;
} else {
@@ -568,11 +568,11 @@ Status PlasmaClient::Impl::GetBuffers(
#ifdef PLASMA_CUDA
std::lock_guard<std::mutex> lock(gpu_mutex);
auto iter = gpu_object_map.find(object_ids[i]);
ARROW_CHECK(iter != gpu_object_map.end());
RAY_CHECK(iter != gpu_object_map.end());
iter->second->client_count++;
physical_buf = MakeBufferFromGpuProcessHandle(iter->second);
#else
ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
RAY_LOG(FATAL) << "Arrow GPU library is not enabled.";
#endif
}
physical_buf = wrap_buffer(object_ids[i], physical_buf);
@@ -612,12 +612,12 @@ Status PlasmaClient::Impl::GetBuffers(
}
for (int64_t i = 0; i < num_objects; ++i) {
DCHECK(received_object_ids[i] == object_ids[i]);
RAY_DCHECK(received_object_ids[i] == object_ids[i]);
object = &object_data[i];
if (object_buffers[i].data) {
// If the object was already in use by the client, then the store should
// have returned it.
DCHECK_NE(object->data_size, -1);
RAY_DCHECK(object->data_size != -1);
// We've already filled out the information for this object, so we can
// just continue.
continue;
@@ -648,7 +648,7 @@ Status PlasmaClient::Impl::GetBuffers(
physical_buf = MakeBufferFromGpuProcessHandle(iter->second);
}
#else
ARROW_LOG(FATAL) << "Arrow GPU library is not enabled.";
RAY_LOG(FATAL) << "Arrow GPU library is not enabled.";
#endif
}
// Finish filling out the return values.
@@ -663,8 +663,8 @@ Status PlasmaClient::Impl::GetBuffers(
} else {
// The object was not retrieved. The caller can detect this condition
// by checking the boolean value of the metadata/data buffers.
DCHECK(!object_buffers[i].metadata);
DCHECK(!object_buffers[i].data);
RAY_DCHECK(!object_buffers[i].metadata);
RAY_DCHECK(!object_buffers[i].data);
}
}
return Status::OK();
@@ -694,8 +694,8 @@ Status PlasmaClient::Impl::Get(const ObjectID* object_ids, int64_t num_objects,
Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) {
auto object_entry = objects_in_use_.find(object_id);
ARROW_CHECK(object_entry != objects_in_use_.end());
ARROW_CHECK(object_entry->second->count == 0);
RAY_CHECK(object_entry != objects_in_use_.end());
RAY_CHECK(object_entry->second->count == 0);
// Remove the entry from the hash table of objects currently in use.
objects_in_use_.erase(object_id);
@@ -710,13 +710,13 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
return Status::OK();
}
auto object_entry = objects_in_use_.find(object_id);
ARROW_CHECK(object_entry != objects_in_use_.end());
RAY_CHECK(object_entry != objects_in_use_.end());
#ifdef PLASMA_CUDA
if (object_entry->second->object.device_num != 0) {
std::lock_guard<std::mutex> lock(gpu_mutex);
auto iter = gpu_object_map.find(object_id);
ARROW_CHECK(iter != gpu_object_map.end());
RAY_CHECK(iter != gpu_object_map.end());
if (--iter->second->client_count == 0) {
delete iter->second;
gpu_object_map.erase(iter);
@@ -725,7 +725,7 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
#endif
object_entry->second->count -= 1;
ARROW_CHECK(object_entry->second->count >= 0);
RAY_CHECK(object_entry->second->count >= 0);
// Check if the client is no longer using this object.
if (object_entry->second->count == 0) {
// Tell the store that the client no longer needs the object.
@@ -754,7 +754,7 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object)
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaContainsReply, &buffer));
ObjectID object_id2;
DCHECK_GT(buffer.size(), 0);
RAY_DCHECK(buffer.size() > 0);
RETURN_NOT_OK(
ReadContainsReply(buffer.data(), buffer.size(), &object_id2, has_object));
}
@@ -796,7 +796,7 @@ bool PlasmaClient::Impl::ComputeObjectHashParallel(XXH64_state_t* hash_state,
&threadhash[num_threads]);
for (auto& fut : futures) {
ARROW_CHECK_OK(fut.status());
RAY_ARROW_CHECK_OK(fut.status());
}
XXH64_update(hash_state, reinterpret_cast<unsigned char*>(threadhash),
@@ -816,8 +816,8 @@ uint64_t PlasmaClient::Impl::ComputeObjectHash(const ObjectBuffer& obj_buffer) {
uint64_t PlasmaClient::Impl::ComputeObjectHashCPU(const uint8_t* data, int64_t data_size,
const uint8_t* metadata,
int64_t metadata_size) {
DCHECK(metadata);
DCHECK(data);
RAY_DCHECK(metadata);
RAY_DCHECK(data);
XXH64_state_t hash_state;
XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
if (data_size >= kBytesInMB) {
@@ -854,7 +854,7 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) {
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaSealReply, &buffer));
ObjectID sealed_id;
RETURN_NOT_OK(ReadSealReply(buffer.data(), buffer.size(), &sealed_id));
ARROW_CHECK(sealed_id == object_id);
RAY_CHECK(sealed_id == object_id);
// We call PlasmaClient::Release to decrement the number of instances of this
// object
// that are currently being used by this client. The corresponding increment
@@ -866,9 +866,9 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) {
Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
auto object_entry = objects_in_use_.find(object_id);
ARROW_CHECK(object_entry != objects_in_use_.end())
RAY_CHECK(object_entry != objects_in_use_.end())
<< "Plasma client called abort on an object without a reference to it";
ARROW_CHECK(!object_entry->second->is_sealed)
RAY_CHECK(!object_entry->second->is_sealed)
<< "Plasma client called abort on a sealed object";
// Make sure that the Plasma client only has one reference to the object. If
@@ -882,8 +882,8 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
if (object_entry->second->object.device_num != 0) {
std::lock_guard<std::mutex> lock(gpu_mutex);
auto iter = gpu_object_map.find(object_id);
ARROW_CHECK(iter != gpu_object_map.end());
ARROW_CHECK(iter->second->client_count == 1);
RAY_CHECK(iter != gpu_object_map.end());
RAY_CHECK(iter->second->client_count == 1);
delete iter->second;
gpu_object_map.erase(iter);
}
@@ -918,7 +918,7 @@ Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
RETURN_NOT_OK(SendDeleteRequest(store_conn_, not_in_use_ids));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaDeleteReply, &buffer));
DCHECK_GT(buffer.size(), 0);
RAY_DCHECK(buffer.size() > 0);
std::vector<PlasmaError> error_codes;
not_in_use_ids.clear();
RETURN_NOT_OK(
@@ -983,16 +983,16 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
// Make the socket non-blocking.
#ifdef _WINSOCKAPI_
unsigned long value = 1;
ARROW_CHECK(ioctlsocket(sock[1], FIONBIO, &value) == 0);
RAY_CHECK(ioctlsocket(sock[1], FIONBIO, &value) == 0);
#else
int flags = fcntl(sock[1], F_GETFL, 0);
ARROW_CHECK(fcntl(sock[1], F_SETFL, flags | O_NONBLOCK) == 0);
RAY_CHECK(fcntl(sock[1], F_SETFL, flags | O_NONBLOCK) == 0);
#endif
// Tell the Plasma store about the subscription.
RETURN_NOT_OK(SendSubscribeRequest(store_conn_));
// Send the file descriptor that the Plasma store should use to push
// notifications about sealed objects to this client.
ARROW_CHECK(send_fd(store_conn_, sock[1]) >= 0);
RAY_CHECK(send_fd(store_conn_, sock[1]) >= 0);
close(sock[1]);
// Return the file descriptor that the client should use to read notifications
// about sealed objects.
@@ -1063,7 +1063,7 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
return Status::NotImplemented("plasma manager is no longer supported");
}
if (release_delay != 0) {
ARROW_LOG(WARNING) << "The release_delay parameter in PlasmaClient::Connect "
RAY_LOG(WARNING) << "The release_delay parameter in PlasmaClient::Connect "
<< "is deprecated";
}
// Send a ConnectRequest to the store to get its memory capacity.
+10 -10
View File
@@ -59,7 +59,7 @@ int fake_munmap(void*, int64_t);
#undef HAVE_MORECORE
#undef DEFAULT_GRANULARITY
// dlmalloc.c defined DEBUG which will conflict with ARROW_LOG(DEBUG).
// dlmalloc.c defined DEBUG which will conflict with RAY_LOG(DEBUG).
#ifdef DEBUG
#undef DEBUG
#endif
@@ -90,12 +90,12 @@ int create_buffer(int64_t size) {
file_name.push_back('\0');
fd = mkstemp(&file_name[0]);
if (fd < 0) {
ARROW_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0];
RAY_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0];
return -1;
}
// Immediately unlink the file so we do not leave traces in the system.
if (unlink(&file_name[0]) != 0) {
ARROW_LOG(FATAL) << "failed to unlink file " << &file_name[0];
RAY_LOG(FATAL) << "failed to unlink file " << &file_name[0];
return -1;
}
if (!plasma_config->hugepages_enabled) {
@@ -103,7 +103,7 @@ int create_buffer(int64_t size) {
// needed for files that are backed by the huge page fs, see also
// http://www.mail-archive.com/kvm-devel@lists.sourceforge.net/msg14737.html
if (ftruncate(fd, (off_t)size) != 0) {
ARROW_LOG(FATAL) << "failed to ftruncate file " << &file_name[0];
RAY_LOG(FATAL) << "failed to ftruncate file " << &file_name[0];
return -1;
}
}
@@ -118,7 +118,7 @@ void* fake_mmap(size_t size) {
size += kMmapRegionsGap;
int fd = create_buffer(size);
ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap";
RAY_CHECK(fd >= 0) << "Failed to create buffer during mmap";
// MAP_POPULATE can be used to pre-populate the page tables for this memory region
// which avoids work when accessing the pages later. However it causes long pauses
// when mmapping the files. Only supported on Linux.
@@ -126,15 +126,15 @@ void* fake_mmap(size_t size) {
#ifdef _WIN32
pointer = MapViewOfFile(reinterpret_cast<HANDLE>(fh_get(fd)), FILE_MAP_ALL_ACCESS, 0, 0, size);
if (pointer == NULL) {
ARROW_LOG(ERROR) << "MapViewOfFile failed with error: " << GetLastError();
RAY_LOG(ERROR) << "MapViewOfFile failed with error: " << GetLastError();
return reinterpret_cast<void*>(-1);
}
#else
pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (pointer == MAP_FAILED) {
ARROW_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno);
RAY_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno);
if (errno == ENOMEM && plasma_config->hugepages_enabled) {
ARROW_LOG(ERROR)
RAY_LOG(ERROR)
<< " (this probably means you have to increase /proc/sys/vm/nr_hugepages)";
}
return pointer;
@@ -150,12 +150,12 @@ void* fake_mmap(size_t size) {
// We lie to dlmalloc about where mapped memory actually lives.
pointer = pointer_advance(pointer, kMmapRegionsGap);
ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")";
RAY_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")";
return pointer;
}
int fake_munmap(void* addr, int64_t size) {
ARROW_LOG(DEBUG) << "fake_munmap(" << addr << ", " << size << ")";
RAY_LOG(DEBUG) << "fake_munmap(" << addr << ", " << size << ")";
addr = pointer_retreat(addr, kMmapRegionsGap);
size += kMmapRegionsGap;
@@ -25,7 +25,7 @@ namespace plasma {
void LRUCache::Add(const ObjectID& key, int64_t size) {
auto it = item_map_.find(key);
ARROW_CHECK(it == item_map_.end());
RAY_CHECK(it == item_map_.end());
// Note that it is important to use a list so the iterators stay valid.
item_list_.emplace_front(key, size);
item_map_.emplace(key, item_list_.begin());
@@ -41,15 +41,15 @@ int64_t LRUCache::Remove(const ObjectID& key) {
used_capacity_ -= size;
item_list_.erase(it->second);
item_map_.erase(it);
ARROW_CHECK(used_capacity_ >= 0) << DebugString();
RAY_CHECK(used_capacity_ >= 0) << DebugString();
return size;
}
void LRUCache::AdjustCapacity(int64_t delta) {
ARROW_LOG(INFO) << "adjusting global lru capacity from " << Capacity() << " to "
RAY_LOG(INFO) << "adjusting global lru capacity from " << Capacity() << " to "
<< (Capacity() + delta) << " (max " << OriginalCapacity() << ")";
capacity_ += delta;
ARROW_CHECK(used_capacity_ >= 0) << DebugString();
RAY_CHECK(used_capacity_ >= 0) << DebugString();
}
int64_t LRUCache::Capacity() const { return capacity_; }
@@ -128,10 +128,10 @@ bool EvictionPolicy::RequireSpace(int64_t size, std::vector<ObjectID>* objects_t
// up to 20% of the total capacity.
int64_t space_to_free =
std::max(required_space, PlasmaAllocator::GetFootprintLimit() / 5);
ARROW_LOG(DEBUG) << "not enough space to create this object, so evicting objects";
RAY_LOG(DEBUG) << "not enough space to create this object, so evicting objects";
// Choose some objects to evict, and update the return pointers.
int64_t num_bytes_evicted = ChooseObjectsToEvict(space_to_free, objects_to_evict);
ARROW_LOG(INFO) << "There is not enough space to create this object, so evicting "
RAY_LOG(INFO) << "There is not enough space to create this object, so evicting "
<< objects_to_evict->size() << " objects to free up "
<< num_bytes_evicted << " bytes. The number of bytes in use (before "
<< "this eviction) is " << PlasmaAllocator::Allocated() << ".";
+6 -6
View File
@@ -16,7 +16,7 @@
#include <string.h>
#include "arrow/util/logging.h"
#include "ray/util/logging.h"
#ifdef _WIN32
#include <ws2tcpip.h> // socklen_t
@@ -71,7 +71,7 @@ int send_fd(int conn, int fd) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
continue;
} else if (errno == EMSGSIZE) {
ARROW_LOG(WARNING) << "Failed to send file descriptor"
RAY_LOG(WARNING) << "Failed to send file descriptor"
<< " (errno = EMSGSIZE), retrying.";
// If we failed to send the file descriptor, loop until we have sent it
// successfully. TODO(rkn): This is problematic for two reasons. First
@@ -81,14 +81,14 @@ int send_fd(int conn, int fd) {
// plasma store event loop which should never happen.
continue;
} else {
ARROW_LOG(INFO) << "Error in send_fd (errno = " << errno << ")";
RAY_LOG(INFO) << "Error in send_fd (errno = " << errno << ")";
return static_cast<int>(r);
}
} else if (r == 0) {
ARROW_LOG(INFO) << "Encountered unexpected EOF";
RAY_LOG(INFO) << "Encountered unexpected EOF";
return 0;
} else {
ARROW_CHECK(r > 0);
RAY_CHECK(r > 0);
return static_cast<int>(r);
}
}
@@ -111,7 +111,7 @@ int recv_fd(int conn) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
continue;
} else {
ARROW_LOG(INFO) << "Error in recv_fd (errno = " << errno << ")";
RAY_LOG(INFO) << "Error in recv_fd (errno = " << errno << ")";
return -1;
}
} else {
@@ -1,58 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <memory>
#include <string>
#include "arrow/util/logging.h"
#include "ray/object_manager/plasma/hash_table_store.h"
namespace plasma {
Status HashTableStore::Connect(const std::string& endpoint) { return Status::OK(); }
Status HashTableStore::Put(const std::vector<ObjectID>& ids,
const std::vector<std::shared_ptr<Buffer>>& data) {
for (size_t i = 0; i < ids.size(); ++i) {
table_[ids[i]] = data[i]->ToString();
}
return Status::OK();
}
Status HashTableStore::Get(const std::vector<ObjectID>& ids,
std::vector<std::shared_ptr<Buffer>> buffers) {
ARROW_CHECK(ids.size() == buffers.size());
for (size_t i = 0; i < ids.size(); ++i) {
bool valid;
HashTable::iterator result;
{
result = table_.find(ids[i]);
valid = result != table_.end();
}
if (valid) {
ARROW_CHECK(buffers[i]->size() == static_cast<int64_t>(result->second.size()));
std::memcpy(buffers[i]->mutable_data(), result->second.data(),
result->second.size());
}
}
return Status::OK();
}
REGISTER_EXTERNAL_STORE("hashtable", HashTableStore);
} // namespace plasma
@@ -1,50 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "ray/object_manager/plasma/external_store.h"
namespace plasma {
// This is a sample implementation for an external store, for illustration
// purposes only.
class HashTableStore : public ExternalStore {
public:
HashTableStore() = default;
Status Connect(const std::string& endpoint) override;
Status Get(const std::vector<ObjectID>& ids,
std::vector<std::shared_ptr<Buffer>> buffers) override;
Status Put(const std::vector<ObjectID>& ids,
const std::vector<std::shared_ptr<Buffer>>& data) override;
private:
typedef std::unordered_map<ObjectID, std::string> HashTable;
HashTable table_;
};
} // namespace plasma
+14 -14
View File
@@ -58,7 +58,7 @@ Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
} else if (nbytes == 0) {
return Status::IOError("Encountered unexpected EOF");
}
ARROW_CHECK(nbytes > 0);
RAY_CHECK(nbytes > 0);
bytesleft -= nbytes;
offset += nbytes;
}
@@ -89,7 +89,7 @@ Status ReadBytes(int fd, uint8_t* cursor, size_t length) {
} else if (0 == nbytes) {
return Status::IOError("Encountered unexpected EOF");
}
ARROW_CHECK(nbytes > 0);
RAY_CHECK(nbytes > 0);
bytesleft -= nbytes;
offset += nbytes;
}
@@ -101,7 +101,7 @@ Status ReadMessage(int fd, MessageType* type, std::vector<uint8_t>* buffer) {
int64_t version;
RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)),
*type = MessageType::PlasmaDisconnectClient);
ARROW_CHECK(version == kPlasmaProtocolVersion) << "version = " << version;
RAY_CHECK(version == kPlasmaProtocolVersion) << "version = " << version;
RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(type), sizeof(*type)),
*type = MessageType::PlasmaDisconnectClient);
int64_t length_temp;
@@ -139,18 +139,18 @@ int ConnectOrListenIpcSock(const std::string& pathname, bool shall_listen) {
socket_address.in.sin_addr.s_addr = inet_addr(addr.substr(0, i).c_str());
socket_address.in.sin_port = htons(static_cast<short>(atoi(addr.substr(j).c_str())));
if (socket_address.in.sin_addr.s_addr == INADDR_NONE) {
ARROW_LOG(ERROR) << "Socket address is not a valid IPv4 address: " << pathname;
RAY_LOG(ERROR) << "Socket address is not a valid IPv4 address: " << pathname;
return -1;
}
if (socket_address.in.sin_port == htons(0)) {
ARROW_LOG(ERROR) << "Socket address is missing a valid port: " << pathname;
RAY_LOG(ERROR) << "Socket address is missing a valid port: " << pathname;
return -1;
}
} else {
addrlen = sizeof(socket_address.un);
socket_address.un.sun_family = AF_UNIX;
if (pathname.size() + 1 > sizeof(socket_address.un.sun_path)) {
ARROW_LOG(ERROR) << "Socket pathname is too long.";
RAY_LOG(ERROR) << "Socket pathname is too long.";
return -1;
}
strncpy(socket_address.un.sun_path, pathname.c_str(), pathname.size() + 1);
@@ -158,7 +158,7 @@ int ConnectOrListenIpcSock(const std::string& pathname, bool shall_listen) {
int socket_fd = socket(socket_address.addr.sa_family, SOCK_STREAM, 0);
if (socket_fd < 0) {
ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname;
RAY_LOG(ERROR) << "socket() failed for pathname " << pathname;
return -1;
}
if (shall_listen) {
@@ -166,7 +166,7 @@ int ConnectOrListenIpcSock(const std::string& pathname, bool shall_listen) {
int on = 1;
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&on),
sizeof(on)) < 0) {
ARROW_LOG(ERROR) << "setsockopt failed for pathname " << pathname;
RAY_LOG(ERROR) << "setsockopt failed for pathname " << pathname;
close(socket_fd);
return -1;
}
@@ -179,13 +179,13 @@ int ConnectOrListenIpcSock(const std::string& pathname, bool shall_listen) {
#endif
}
if (bind(socket_fd, &socket_address.addr, addrlen) != 0) {
ARROW_LOG(ERROR) << "Bind failed for pathname " << pathname;
RAY_LOG(ERROR) << "Bind failed for pathname " << pathname;
close(socket_fd);
return -1;
}
if (listen(socket_fd, 128) == -1) {
ARROW_LOG(ERROR) << "Could not listen to socket " << pathname;
RAY_LOG(ERROR) << "Could not listen to socket " << pathname;
close(socket_fd);
return -1;
}
@@ -209,7 +209,7 @@ Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
}
*fd = ConnectOrListenIpcSock(pathname, false);
while (*fd < 0 && num_retries > 0) {
ARROW_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname
RAY_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname
<< ", retrying " << num_retries << " more times";
// Sleep for timeout milliseconds.
usleep(static_cast<int>(timeout * 1000));
@@ -228,7 +228,7 @@ Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
int AcceptClient(int socket_fd) {
int client_fd = accept(socket_fd, NULL, NULL);
if (client_fd < 0) {
ARROW_LOG(ERROR) << "Error reading from socket.";
RAY_LOG(ERROR) << "Error reading from socket.";
return -1;
}
return client_fd;
@@ -239,7 +239,7 @@ std::unique_ptr<uint8_t[]> ReadMessageAsync(int sock) {
Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size), sizeof(int64_t));
if (!s.ok()) {
// The other side has closed the socket.
ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
RAY_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
close(sock);
return NULL;
}
@@ -247,7 +247,7 @@ std::unique_ptr<uint8_t[]> ReadMessageAsync(int sock) {
s = ReadBytes(sock, message.get(), size);
if (!s.ok()) {
// The other side has closed the socket.
ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
RAY_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
close(sock);
return NULL;
}
+1 -1
View File
@@ -61,7 +61,7 @@ int64_t GetMmapSize(int fd) {
return entry.second.size;
}
}
ARROW_LOG(FATAL) << "failed to find entry in mmap_records for fd " << fd;
RAY_LOG(FATAL) << "failed to find entry in mmap_records for fd " << fd;
return -1; // This code is never reached.
}
+2 -2
View File
@@ -38,14 +38,14 @@ int WarnIfSigpipe(int status, int client_sock) {
return 0;
}
if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {
ARROW_LOG(WARNING) << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when "
RAY_LOG(WARNING) << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when "
"sending a message to client on fd "
<< client_sock
<< ". The client on the other end may "
"have hung up.";
return errno;
}
ARROW_LOG(FATAL) << "Failed to write message to client on fd " << client_sock << ".";
RAY_LOG(FATAL) << "Failed to write message to client on fd " << client_sock << ".";
return -1; // This is never reached.
}
+3 -2
View File
@@ -31,12 +31,13 @@
#include <unordered_set>
#include <vector>
#include "ray/common/status.h"
#include "ray/object_manager/plasma/compat.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "ray/object_manager/plasma/common.h"
#include "ray/util/logging.h"
#ifdef PLASMA_CUDA
using arrow::cuda::CudaIpcMemHandle;
@@ -53,7 +54,7 @@ struct ObjectInfoT;
Status _s = (s); \
if (!_s.ok()) { \
if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { \
ARROW_LOG(WARNING) \
RAY_LOG(WARNING) \
<< "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " \
"sending a message to client on fd " \
<< fd_ \
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include <arrow/util/logging.h>
#include "ray/util/logging.h"
#include "ray/object_manager/plasma/malloc.h"
#include "ray/object_manager/plasma/plasma_allocator.h"
@@ -35,7 +35,7 @@ void* PlasmaAllocator::Memalign(size_t alignment, size_t bytes) {
return nullptr;
}
void* mem = dlmemalign(alignment, bytes);
ARROW_CHECK(mem);
RAY_CHECK(mem);
allocated_ += bytes;
return mem;
}
+65 -65
View File
@@ -72,7 +72,7 @@ flatbuffers::Offset<flatbuffers::Vector<int64_t>> ToFlatbuffer(
Status PlasmaReceive(int sock, MessageType message_type, std::vector<uint8_t>* buffer) {
MessageType type;
RETURN_NOT_OK(ReadMessage(sock, &type, buffer));
ARROW_CHECK(type == message_type)
RAY_CHECK(type == message_type)
<< "type = " << static_cast<int64_t>(type)
<< ", message_type = " << static_cast<int64_t>(message_type);
return Status::OK();
@@ -121,7 +121,7 @@ Status PlasmaErrorStatus(fb::PlasmaError plasma_error) {
return MakePlasmaError(PlasmaErrorCode::PlasmaStoreFull,
"object does not fit in the plasma store");
default:
ARROW_LOG(FATAL) << "unknown plasma error code " << static_cast<int>(plasma_error);
RAY_LOG(FATAL) << "unknown plasma error code " << static_cast<int>(plasma_error);
}
return Status::OK();
}
@@ -138,9 +138,9 @@ Status SendSetOptionsRequest(int sock, const std::string& client_name,
Status ReadSetOptionsRequest(uint8_t* data, size_t size, std::string* client_name,
int64_t* output_memory_quota) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaSetOptionsRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*client_name = std::string(message->client_name()->str());
*output_memory_quota = message->output_memory_quota();
return Status::OK();
@@ -153,9 +153,9 @@ Status SendSetOptionsReply(int sock, PlasmaError error) {
}
Status ReadSetOptionsReply(uint8_t* data, size_t size) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaSetOptionsReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
return PlasmaErrorStatus(message->error());
}
@@ -174,9 +174,9 @@ Status SendGetDebugStringReply(int sock, const std::string& debug_string) {
}
Status ReadGetDebugStringReply(uint8_t* data, size_t size, std::string* debug_string) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaGetDebugStringReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*debug_string = message->debug_string()->str();
return Status::OK();
}
@@ -195,9 +195,9 @@ Status SendCreateRequest(int sock, ObjectID object_id, bool evict_if_full,
Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
bool* evict_if_full, int64_t* data_size, int64_t* metadata_size,
int* device_num) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*evict_if_full = message->evict_if_full();
*data_size = message->data_size();
*metadata_size = message->metadata_size();
@@ -232,7 +232,7 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
#ifdef PLASMA_CUDA
crb.add_ipc_handle(ipc_handle);
#else
ARROW_LOG(FATAL) << "This should be unreachable.";
RAY_LOG(FATAL) << "This should be unreachable.";
#endif
}
auto message = crb.Finish();
@@ -241,9 +241,9 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
PlasmaObject* object, int* store_fd, int64_t* mmap_size) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::FromBinary(message->object_id()->str());
object->store_fd = message->plasma_object()->segment_index();
object->data_offset = message->plasma_object()->data_offset();
@@ -277,9 +277,9 @@ Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_
Status ReadCreateAndSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
bool* evict_if_full, std::string* object_data,
std::string* metadata) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::FromBinary(message->object_id()->str());
*evict_if_full = message->evict_if_full();
@@ -306,9 +306,9 @@ Status ReadCreateAndSealBatchRequest(uint8_t* data, size_t size,
bool* evict_if_full,
std::vector<std::string>* object_data,
std::vector<std::string>* metadata) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealBatchRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*evict_if_full = message->evict_if_full();
ConvertToVector(message->object_ids(), object_ids,
@@ -332,9 +332,9 @@ Status SendCreateAndSealReply(int sock, PlasmaError error) {
}
Status ReadCreateAndSealReply(uint8_t* data, size_t size) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
return PlasmaErrorStatus(message->error());
}
@@ -346,9 +346,9 @@ Status SendCreateAndSealBatchReply(int sock, PlasmaError error) {
}
Status ReadCreateAndSealBatchReply(uint8_t* data, size_t size) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealBatchReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
return PlasmaErrorStatus(message->error());
}
@@ -359,9 +359,9 @@ Status SendAbortRequest(int sock, ObjectID object_id) {
}
Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaAbortRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::FromBinary(message->object_id()->str());
return Status::OK();
}
@@ -373,9 +373,9 @@ Status SendAbortReply(int sock, ObjectID object_id) {
}
Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaAbortReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::FromBinary(message->object_id()->str());
return Status::OK();
}
@@ -389,9 +389,9 @@ Status SendSealRequest(int sock, ObjectID object_id) {
}
Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaSealRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::FromBinary(message->object_id()->str());
return Status::OK();
}
@@ -404,9 +404,9 @@ Status SendSealReply(int sock, ObjectID object_id, PlasmaError error) {
}
Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaSealReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::FromBinary(message->object_id()->str());
return PlasmaErrorStatus(message->error());
}
@@ -421,9 +421,9 @@ Status SendReleaseRequest(int sock, ObjectID object_id) {
}
Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaReleaseRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::FromBinary(message->object_id()->str());
return Status::OK();
}
@@ -436,9 +436,9 @@ Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error) {
}
Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaReleaseReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::FromBinary(message->object_id()->str());
return PlasmaErrorStatus(message->error());
}
@@ -456,10 +456,10 @@ Status SendDeleteRequest(int sock, const std::vector<ObjectID>& object_ids) {
Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids) {
using fb::PlasmaDeleteRequest;
DCHECK(data);
DCHECK(object_ids);
RAY_DCHECK(data);
RAY_DCHECK(object_ids);
auto message = flatbuffers::GetRoot<PlasmaDeleteRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
ToVector(*message, object_ids, [](const PlasmaDeleteRequest& request, int i) {
return ObjectID::FromBinary(request.object_ids()->Get(i)->str());
});
@@ -468,7 +468,7 @@ Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector<ObjectID>* obje
Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids,
const std::vector<PlasmaError>& errors) {
DCHECK(object_ids.size() == errors.size());
RAY_DCHECK(object_ids.size() == errors.size());
flatbuffers::FlatBufferBuilder fbb;
auto message = fb::CreatePlasmaDeleteReply(
fbb, static_cast<int32_t>(object_ids.size()),
@@ -483,11 +483,11 @@ Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object
std::vector<PlasmaError>* errors) {
using fb::PlasmaDeleteReply;
DCHECK(data);
DCHECK(object_ids);
DCHECK(errors);
RAY_DCHECK(data);
RAY_DCHECK(object_ids);
RAY_DCHECK(errors);
auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
ToVector(*message, object_ids, [](const PlasmaDeleteReply& request, int i) {
return ObjectID::FromBinary(request.object_ids()->Get(i)->str());
});
@@ -507,9 +507,9 @@ Status SendContainsRequest(int sock, ObjectID object_id) {
}
Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaContainsRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::FromBinary(message->object_id()->str());
return Status::OK();
}
@@ -523,9 +523,9 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
bool* has_object) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaContainsReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::FromBinary(message->object_id()->str());
*has_object = message->has_object();
return Status::OK();
@@ -548,9 +548,9 @@ Status SendConnectReply(int sock, int64_t memory_capacity) {
}
Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaConnectReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*memory_capacity = message->memory_capacity();
return Status::OK();
}
@@ -564,9 +564,9 @@ Status SendEvictRequest(int sock, int64_t num_bytes) {
}
Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaEvictRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*num_bytes = message->num_bytes();
return Status::OK();
}
@@ -578,9 +578,9 @@ Status SendEvictReply(int sock, int64_t num_bytes) {
}
Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaEvictReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
num_bytes = message->num_bytes();
return Status::OK();
}
@@ -597,9 +597,9 @@ Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects,
Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids,
int64_t* timeout_ms) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaGetRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
auto object_id = message->object_ids()->Get(i)->str();
object_ids.push_back(ObjectID::FromBinary(object_id));
@@ -642,12 +642,12 @@ Status SendGetReply(int sock, ObjectID object_ids[],
Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
PlasmaObject plasma_objects[], int64_t num_objects,
std::vector<int>& store_fds, std::vector<int64_t>& mmap_sizes) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaGetReply>(data);
#ifdef PLASMA_CUDA
int handle_pos = 0;
#endif
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::FromBinary(message->object_ids()->Get(i)->str());
}
@@ -668,7 +668,7 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
}
#endif
}
ARROW_CHECK(message->store_fds()->size() == message->mmap_sizes()->size());
RAY_CHECK(message->store_fds()->size() == message->mmap_sizes()->size());
for (uoffset_t i = 0; i < message->store_fds()->size(); i++) {
store_fds.push_back(message->store_fds()->Get(i));
mmap_sizes.push_back(message->mmap_sizes()->Get(i));
@@ -696,10 +696,10 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po
Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address,
int* port) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaDataRequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
DCHECK(message->object_id()->size() == sizeof(ObjectID));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(message->object_id()->size() == sizeof(ObjectID));
*object_id = ObjectID::FromBinary(message->object_id()->str());
#ifdef _WIN32
*address = _strdup(message->address()->c_str());
@@ -720,9 +720,9 @@ Status SendDataReply(int sock, ObjectID object_id, int64_t object_size,
Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id,
int64_t* object_size, int64_t* metadata_size) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaDataReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::FromBinary(message->object_id()->str());
*object_size = static_cast<int64_t>(message->object_size());
*metadata_size = static_cast<int64_t>(message->metadata_size());
@@ -742,9 +742,9 @@ Status SendRefreshLRURequest(int sock, const std::vector<ObjectID>& object_ids)
Status ReadRefreshLRURequest(uint8_t* data, size_t size,
std::vector<ObjectID>* object_ids) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaRefreshLRURequest>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
auto object_id = message->object_ids()->Get(i)->str();
object_ids->push_back(ObjectID::FromBinary(object_id));
@@ -759,9 +759,9 @@ Status SendRefreshLRUReply(int sock) {
}
Status ReadRefreshLRUReply(uint8_t* data, size_t size) {
DCHECK(data);
RAY_DCHECK(data);
auto message = flatbuffers::GetRoot<fb::PlasmaRefreshLRUReply>(data);
DCHECK(VerifyFlatbuffer(message, data, size));
RAY_DCHECK(VerifyFlatbuffer(message, data, size));
return Status::OK();
}
@@ -47,13 +47,13 @@ void QuotaAwarePolicy::ObjectCreated(const ObjectID& object_id, Client* client,
bool QuotaAwarePolicy::SetClientQuota(Client* client, int64_t output_memory_quota) {
if (per_client_cache_.find(client) != per_client_cache_.end()) {
ARROW_LOG(WARNING) << "Cannot change the client quota once set";
RAY_LOG(WARNING) << "Cannot change the client quota once set";
return false;
}
if (cache_.Capacity() - output_memory_quota <
cache_.OriginalCapacity() * kGlobalLruReserveFraction) {
ARROW_LOG(WARNING) << "Not enough memory to set client quota: " << DebugString();
RAY_LOG(WARNING) << "Not enough memory to set client quota: " << DebugString();
return false;
}
@@ -72,7 +72,7 @@ bool QuotaAwarePolicy::EnforcePerClientQuota(Client* client, int64_t size, bool
auto& client_cache = per_client_cache_[client];
if (size > client_cache->Capacity()) {
ARROW_LOG(WARNING) << "object too large (" << size
RAY_LOG(WARNING) << "object too large (" << size
<< " bytes) to fit in client quota " << client_cache->Capacity()
<< " " << DebugString();
return false;
+44 -44
View File
@@ -180,7 +180,7 @@ uint8_t* PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, int* fd,
if (pointer != nullptr) {
GetMallocMapinfo(pointer, fd, map_size, offset);
ARROW_CHECK(*fd != -1);
RAY_CHECK(*fd != -1);
}
return pointer;
}
@@ -209,7 +209,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f
int64_t data_size, int64_t metadata_size,
int device_num, Client* client,
PlasmaObject* result) {
ARROW_LOG(DEBUG) << "creating object " << object_id.Hex();
RAY_LOG(DEBUG) << "creating object " << object_id.Hex();
auto entry = GetObjectTableEntry(&store_info_, object_id);
if (entry != nullptr) {
@@ -228,7 +228,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f
pointer =
AllocateMemory(total_size, evict_if_full, &fd, &map_size, &offset, client, true);
if (!pointer) {
ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.Hex()
RAY_LOG(ERROR) << "Not enough memory to create the object " << object_id.Hex()
<< ", data_size=" << data_size
<< ", metadata_size=" << metadata_size
<< ", will send a reply of PlasmaError::OutOfMemory";
@@ -240,12 +240,12 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f
std::shared_ptr<::arrow::cuda::CudaIpcMemHandle> ipc_handle;
auto st = AllocateCudaMemory(device_num, total_size, &pointer, &ipc_handle);
if (!st.ok()) {
ARROW_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString();
RAY_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString();
return PlasmaError::OutOfMemory;
}
result->ipc_handle = ipc_handle;
#else
ARROW_LOG(ERROR) << "device_num != 0 but CUDA not enabled";
RAY_LOG(ERROR) << "device_num != 0 but CUDA not enabled";
return PlasmaError::OutOfMemory;
#endif
}
@@ -284,9 +284,9 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f
}
void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
DCHECK(object != nullptr);
DCHECK(entry != nullptr);
DCHECK(entry->state == ObjectState::PLASMA_SEALED);
RAY_DCHECK(object != nullptr);
RAY_DCHECK(entry != nullptr);
RAY_DCHECK(entry->state == ObjectState::PLASMA_SEALED);
#ifdef PLASMA_CUDA
if (entry->device_num != 0) {
object->ipc_handle = entry->ipc_handle;
@@ -321,7 +321,7 @@ void PlasmaStore::RemoveGetRequest(GetRequest* get_request) {
}
// Remove the get request.
if (get_request->timer != -1) {
ARROW_CHECK(loop_->RemoveTimer(get_request->timer) == kEventLoopOk);
RAY_CHECK(loop_->RemoveTimer(get_request->timer) == kEventLoopOk);
}
delete get_request;
}
@@ -338,7 +338,7 @@ void PlasmaStore::RemoveGetRequestsForClient(Client* client) {
// It shouldn't be possible for a given client to be in the middle of multiple get
// requests.
ARROW_CHECK(get_requests_to_remove.size() <= 1);
RAY_CHECK(get_requests_to_remove.size() <= 1);
for (GetRequest* get_request : get_requests_to_remove) {
RemoveGetRequest(get_request);
}
@@ -399,7 +399,7 @@ void PlasmaStore::UpdateObjectGetRequests(const ObjectID& object_id) {
for (size_t i = 0; i < num_requests; ++i) {
auto get_req = get_requests[index];
auto entry = GetObjectTableEntry(&store_info_, object_id);
ARROW_CHECK(entry != nullptr);
RAY_CHECK(entry != nullptr);
PlasmaObject_init(&get_req->objects[object_id], entry);
get_req->num_satisfied += 1;
@@ -447,7 +447,7 @@ void PlasmaStore::ProcessGetRequest(Client* client,
AddToClientObjectIds(object_id, entry, client);
} else if (entry && entry->state == ObjectState::PLASMA_EVICTED) {
// Make sure the object pointer is not already allocated
ARROW_CHECK(!entry->pointer);
RAY_CHECK(!entry->pointer);
entry->pointer =
AllocateMemory(entry->data_size + entry->metadata_size, /*evict=*/true,
@@ -478,7 +478,7 @@ void PlasmaStore::ProcessGetRequest(Client* client,
if (!evicted_ids.empty()) {
std::vector<std::shared_ptr<Buffer>> buffers;
for (size_t i = 0; i < evicted_ids.size(); ++i) {
ARROW_CHECK(evicted_entries[i]->pointer != nullptr);
RAY_CHECK(evicted_entries[i]->pointer != nullptr);
buffers.emplace_back(new arrow::MutableBuffer(evicted_entries[i]->pointer,
evicted_entries[i]->data_size));
}
@@ -550,7 +550,7 @@ void PlasmaStore::EraseFromObjectTable(const ObjectID& object_id) {
PlasmaAllocator::Free(object->pointer, buff_size);
} else {
#ifdef PLASMA_CUDA
ARROW_CHECK_OK(FreeCudaMemory(object->device_num, buff_size, object->pointer));
RAY_CHECK_OK(FreeCudaMemory(object->device_num, buff_size, object->pointer));
#endif
}
store_info_.objects.erase(object_id);
@@ -558,9 +558,9 @@ void PlasmaStore::EraseFromObjectTable(const ObjectID& object_id) {
void PlasmaStore::ReleaseObject(const ObjectID& object_id, Client* client) {
auto entry = GetObjectTableEntry(&store_info_, object_id);
ARROW_CHECK(entry != nullptr);
RAY_CHECK(entry != nullptr);
// Remove the client from the object's array of clients.
ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1);
RAY_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1);
}
// Check if an object is present.
@@ -575,12 +575,12 @@ ObjectStatus PlasmaStore::ContainsObject(const ObjectID& object_id) {
void PlasmaStore::SealObjects(const std::vector<ObjectID>& object_ids) {
std::vector<ObjectInfoT> infos;
ARROW_LOG(DEBUG) << "sealing " << object_ids.size() << " objects";
RAY_LOG(DEBUG) << "sealing " << object_ids.size() << " objects";
for (size_t i = 0; i < object_ids.size(); ++i) {
ObjectInfoT object_info;
auto entry = GetObjectTableEntry(&store_info_, object_ids[i]);
ARROW_CHECK(entry != nullptr);
ARROW_CHECK(entry->state == ObjectState::PLASMA_CREATED);
RAY_CHECK(entry != nullptr);
RAY_CHECK(entry->state == ObjectState::PLASMA_CREATED);
// Set the state of object to SEALED.
entry->state = ObjectState::PLASMA_SEALED;
// Set object construction duration.
@@ -601,8 +601,8 @@ void PlasmaStore::SealObjects(const std::vector<ObjectID>& object_ids) {
int PlasmaStore::AbortObject(const ObjectID& object_id, Client* client) {
auto entry = GetObjectTableEntry(&store_info_, object_id);
ARROW_CHECK(entry != nullptr) << "To abort an object it must be in the object table.";
ARROW_CHECK(entry->state != ObjectState::PLASMA_SEALED)
RAY_CHECK(entry != nullptr) << "To abort an object it must be in the object table.";
RAY_CHECK(entry->state != ObjectState::PLASMA_SEALED)
<< "To abort an object it must not have been sealed.";
auto it = client->object_ids.find(object_id);
if (it == client->object_ids.end()) {
@@ -660,15 +660,15 @@ void PlasmaStore::EvictObjects(const std::vector<ObjectID>& object_ids) {
std::vector<std::shared_ptr<arrow::Buffer>> evicted_object_data;
std::vector<ObjectTableEntry*> evicted_entries;
for (const auto& object_id : object_ids) {
ARROW_LOG(DEBUG) << "evicting object " << object_id.Hex();
RAY_LOG(DEBUG) << "evicting object " << object_id.Hex();
auto entry = GetObjectTableEntry(&store_info_, object_id);
// TODO(rkn): This should probably not fail, but should instead throw an
// error. Maybe we should also support deleting objects that have been
// created but not sealed.
ARROW_CHECK(entry != nullptr) << "To evict an object it must be in the object table.";
ARROW_CHECK(entry->state == ObjectState::PLASMA_SEALED)
RAY_CHECK(entry != nullptr) << "To evict an object it must be in the object table.";
RAY_CHECK(entry->state == ObjectState::PLASMA_SEALED)
<< "To evict an object it must have been sealed.";
ARROW_CHECK(entry->ref_count == 0)
RAY_CHECK(entry->ref_count == 0)
<< "To evict an object, there must be no clients currently using it.";
// If there is a backing external store, then mark object for eviction to
@@ -691,7 +691,7 @@ void PlasmaStore::EvictObjects(const std::vector<ObjectID>& object_ids) {
}
if (external_store_ && !object_ids.empty()) {
ARROW_CHECK_OK(external_store_->Put(object_ids, evicted_object_data));
RAY_ARROW_CHECK_OK(external_store_->Put(object_ids, evicted_object_data));
for (auto entry : evicted_entries) {
PlasmaAllocator::Free(entry->pointer, entry->data_size + entry->metadata_size);
entry->pointer = nullptr;
@@ -711,20 +711,20 @@ void PlasmaStore::ConnectClient(int listener_sock) {
loop_->AddFileEvent(client_fd, kEventLoopRead, [this, client](int events) {
Status s = ProcessMessage(client);
if (!s.ok()) {
ARROW_LOG(FATAL) << "Failed to process file event: " << s;
RAY_LOG(FATAL) << "Failed to process file event: " << s;
}
});
ARROW_LOG(DEBUG) << "New connection with fd " << client_fd;
RAY_LOG(DEBUG) << "New connection with fd " << client_fd;
}
void PlasmaStore::DisconnectClient(int client_fd) {
ARROW_CHECK(client_fd > 0);
RAY_CHECK(client_fd > 0);
auto it = connected_clients_.find(client_fd);
ARROW_CHECK(it != connected_clients_.end());
RAY_CHECK(it != connected_clients_.end());
loop_->RemoveFileEvent(client_fd);
// Close the socket.
close(client_fd);
ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
RAY_LOG(INFO) << "Disconnecting client on fd " << client_fd;
// Release all the objects that the client was using.
auto client = it->second.get();
eviction_policy_.ClientDisconnected(client);
@@ -794,10 +794,10 @@ PlasmaStore::NotificationMap::iterator PlasmaStore::SendNotifications(
// Attempt to send a notification about this object ID.
ssize_t nbytes = send(client_fd, notification.get(), sizeof(int64_t) + size, 0);
if (nbytes >= 0) {
ARROW_CHECK(nbytes == static_cast<ssize_t>(sizeof(int64_t)) + size);
RAY_CHECK(nbytes == static_cast<ssize_t>(sizeof(int64_t)) + size);
} else if (nbytes == -1 &&
(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
ARROW_LOG(DEBUG) << "The socket's send buffer is full, so we are caching this "
RAY_LOG(DEBUG) << "The socket's send buffer is full, so we are caching this "
"notification and will send it later.";
// Add a callback to the event loop to send queued notifications whenever
// there is room in the socket's send buffer. Callbacks can be added
@@ -810,7 +810,7 @@ PlasmaStore::NotificationMap::iterator PlasmaStore::SendNotifications(
});
break;
} else {
ARROW_LOG(WARNING) << "Failed to send notification to client on fd " << client_fd;
RAY_LOG(WARNING) << "Failed to send notification to client on fd " << client_fd;
if (errno == EPIPE) {
closed = true;
break;
@@ -868,7 +868,7 @@ void PlasmaStore::PushNotification(fb::ObjectInfoT* object_info, int client_fd)
// Subscribe to notifications about sealed objects.
void PlasmaStore::SubscribeToUpdates(Client* client) {
ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd;
RAY_LOG(DEBUG) << "subscribing to updates on fd " << client->fd;
if (client->notification_fd > 0) {
// This client has already subscribed. Return.
return;
@@ -879,7 +879,7 @@ void PlasmaStore::SubscribeToUpdates(Client* client) {
int fd = recv_fd(client->fd);
if (fd < 0) {
// This may mean that the client died before sending the file descriptor.
ARROW_LOG(WARNING) << "Failed to receive file descriptor from client on fd "
RAY_LOG(WARNING) << "Failed to receive file descriptor from client on fd "
<< client->fd << ".";
return;
}
@@ -903,7 +903,7 @@ void PlasmaStore::SubscribeToUpdates(Client* client) {
Status PlasmaStore::ProcessMessage(Client* client) {
fb::MessageType type;
Status s = ReadMessage(client->fd, &type, &input_buffer_);
ARROW_CHECK(s.ok() || s.IsIOError());
RAY_CHECK(s.ok() || s.IsIOError());
uint8_t* input = input_buffer_.data();
size_t input_size = input_buffer_.size();
@@ -951,7 +951,7 @@ Status PlasmaStore::ProcessMessage(Client* client) {
// If the object was successfully created, fill out the object data and seal it.
if (error_code == PlasmaError::OK) {
auto entry = GetObjectTableEntry(&store_info_, object_id);
ARROW_CHECK(entry != nullptr);
RAY_CHECK(entry != nullptr);
// Write the inlined data and metadata into the allocated object.
std::memcpy(entry->pointer, data.data(), data.size());
std::memcpy(entry->pointer + data.size(), metadata.data(), metadata.size());
@@ -960,7 +960,7 @@ Status PlasmaStore::ProcessMessage(Client* client) {
// object is not being used by any client. The client was added to the
// object's array of clients in CreateObject. This is analogous to the
// Release call that happens in the client's Seal method.
ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1);
RAY_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1);
}
// Reply to the client.
@@ -993,7 +993,7 @@ Status PlasmaStore::ProcessMessage(Client* client) {
if (error_code == PlasmaError::OK) {
for (i = 0; i < object_ids.size(); i++) {
auto entry = GetObjectTableEntry(&store_info_, object_ids[i]);
ARROW_CHECK(entry != nullptr);
RAY_CHECK(entry != nullptr);
// Write the inlined data and metadata into the allocated object.
std::memcpy(entry->pointer, data[i].data(), data[i].size());
std::memcpy(entry->pointer + data[i].size(), metadata[i].data(),
@@ -1007,7 +1007,7 @@ Status PlasmaStore::ProcessMessage(Client* client) {
// Release call that happens in the client's Seal method.
for (i = 0; i < object_ids.size(); i++) {
auto entry = GetObjectTableEntry(&store_info_, object_ids[i]);
ARROW_CHECK(RemoveFromClientObjectIds(object_ids[i], entry, client) == 1);
RAY_CHECK(RemoveFromClientObjectIds(object_ids[i], entry, client) == 1);
}
} else {
for (size_t j = 0; j < i; j++) {
@@ -1019,7 +1019,7 @@ Status PlasmaStore::ProcessMessage(Client* client) {
} break;
case fb::MessageType::PlasmaAbortRequest: {
RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
ARROW_CHECK(AbortObject(object_id, client) == 1) << "To abort an object, the only "
RAY_CHECK(AbortObject(object_id, client) == 1) << "To abort an object, the only "
"client currently using it "
"must be the creator.";
HANDLE_SIGPIPE(SendAbortReply(client->fd, object_id), client->fd);
@@ -1081,7 +1081,7 @@ Status PlasmaStore::ProcessMessage(Client* client) {
client->fd);
} break;
case fb::MessageType::PlasmaDisconnectClient:
ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd;
RAY_LOG(DEBUG) << "Disconnecting client on fd " << client->fd;
DisconnectClient(client->fd);
break;
case fb::MessageType::PlasmaSetOptionsRequest: {
@@ -1101,7 +1101,7 @@ Status PlasmaStore::ProcessMessage(Client* client) {
} break;
default:
// This code should be unreachable.
ARROW_CHECK(0);
RAY_CHECK(0);
}
return Status::OK();
}
+14 -19
View File
@@ -11,31 +11,27 @@
namespace plasma {
using arrow::util::ArrowLog;
using arrow::util::ArrowLogLevel;
void SetMallocGranularity(int value);
PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_memory,
bool hugepages_enabled, std::string plasma_directory,
const std::string external_store_endpoint):
hugepages_enabled_(hugepages_enabled), external_store_endpoint_(external_store_endpoint) {
ArrowLog::StartArrowLog("plasma_store", ArrowLogLevel::ARROW_INFO);
// Sanity check.
if (socket_name.empty()) {
ARROW_LOG(FATAL) << "please specify socket for incoming connections with -s switch";
RAY_LOG(FATAL) << "please specify socket for incoming connections with -s switch";
}
socket_name_ = socket_name;
if (system_memory == -1) {
ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch";
RAY_LOG(FATAL) << "please specify the amount of system memory with -m switch";
}
// Set system memory capacity
PlasmaAllocator::SetFootprintLimit(static_cast<size_t>(system_memory));
ARROW_LOG(INFO) << "Allowing the Plasma store to use up to "
RAY_LOG(INFO) << "Allowing the Plasma store to use up to "
<< static_cast<double>(system_memory) / 1000000000
<< "GB of memory.";
if (hugepages_enabled && plasma_directory.empty()) {
ARROW_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages "
RAY_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages "
"filesystem with -d";
}
if (plasma_directory.empty()) {
@@ -45,7 +41,7 @@ PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_mem
plasma_directory = "/tmp";
#endif
}
ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory
RAY_LOG(INFO) << "Starting object store with directory " << plasma_directory
<< " and huge page support "
<< (hugepages_enabled ? "enabled" : "disabled");
#ifdef __linux__
@@ -62,7 +58,7 @@ PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_mem
// Keep some safety margin for allocator fragmentation.
shm_mem_avail = 9 * shm_mem_avail / 10;
if (system_memory > shm_mem_avail) {
ARROW_LOG(WARNING)
RAY_LOG(WARNING)
<< "System memory request exceeds memory available in " << plasma_directory
<< ". The request is for " << system_memory
<< " bytes, and the amount available is " << shm_mem_avail
@@ -88,16 +84,16 @@ void PlasmaStoreRunner::Start() {
std::shared_ptr<plasma::ExternalStore> external_store{nullptr};
if (!external_store_endpoint_.empty()) {
std::string name;
ARROW_CHECK_OK(
RAY_ARROW_CHECK_OK(
plasma::ExternalStores::ExtractStoreName(external_store_endpoint_, &name));
external_store = plasma::ExternalStores::GetStore(name);
if (external_store == nullptr) {
ARROW_LOG(FATAL) << "No such external store \"" << name << "\"";
RAY_LOG(FATAL) << "No such external store \"" << name << "\"";
}
ARROW_LOG(DEBUG) << "connecting to external store...";
ARROW_CHECK_OK(external_store->Connect(external_store_endpoint_));
RAY_LOG(DEBUG) << "connecting to external store...";
RAY_ARROW_CHECK_OK(external_store->Connect(external_store_endpoint_));
}
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name_;
RAY_LOG(DEBUG) << "starting server listening on " << socket_name_;
// Create the event loop.
loop_.reset(new EventLoop);
@@ -111,7 +107,7 @@ void PlasmaStoreRunner::Start() {
// bookkeeping.
void* pointer = PlasmaAllocator::Memalign(
kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
ARROW_CHECK(pointer != nullptr);
RAY_CHECK(pointer != nullptr);
// This will unmap the file, but the next one created will be as large
// as this one (this is an implementation detail of dlmalloc).
PlasmaAllocator::Free(
@@ -119,7 +115,7 @@ void PlasmaStoreRunner::Start() {
int socket = ConnectOrListenIpcSock(socket_name_, true);
// TODO(pcm): Check return value.
ARROW_CHECK(socket >= 0);
RAY_CHECK(socket >= 0);
loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) {
this->store_->ConnectClient(socket);
@@ -130,14 +126,13 @@ void PlasmaStoreRunner::Start() {
#ifdef _WINSOCKAPI_
WSACleanup();
#endif
ArrowLog::ShutDownArrowLog();
}
void PlasmaStoreRunner::Stop() {
if (loop_) {
loop_->Stop();
} else {
ARROW_LOG(ERROR) << "Expected loop_ to be non-NULL; this may be a bug";
RAY_LOG(ERROR) << "Expected loop_ to be non-NULL; this may be a bug";
}
}
File diff suppressed because it is too large Load Diff
@@ -1,143 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <assert.h>
#include <signal.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <memory>
#include <thread>
#include <gtest/gtest.h>
#include "arrow/testing/gtest_util.h"
#include "arrow/util/io_util.h"
#include "ray/object_manager/plasma/client.h"
#include "ray/object_manager/plasma/common.h"
#include "ray/object_manager/plasma/external_store.h"
#include "ray/object_manager/plasma/plasma.h"
#include "ray/object_manager/plasma/protocol.h"
#include "ray/object_manager/plasma/test_util.h"
namespace plasma {
using arrow::internal::TemporaryDir;
std::string external_test_executable; // NOLINT
void AssertObjectBufferEqual(const ObjectBuffer& object_buffer,
const std::string& metadata, const std::string& data) {
arrow::AssertBufferEqual(*object_buffer.metadata, metadata);
arrow::AssertBufferEqual(*object_buffer.data, data);
}
class TestPlasmaStoreWithExternal : public ::testing::Test {
public:
// TODO(pcm): At the moment, stdout of the test gets mixed up with
// stdout of the object store. Consider changing that.
void SetUp() override {
ASSERT_OK_AND_ASSIGN(temp_dir_, TemporaryDir::Make("ext-test-"));
store_socket_name_ = temp_dir_->path().ToString() + "store";
std::string plasma_directory =
external_test_executable.substr(0, external_test_executable.find_last_of('/'));
std::string plasma_command = plasma_directory +
"/plasma-store-server -m 1024000 -e " +
"hashtable://test -s " + store_socket_name_ +
" 1> /tmp/log.stdout 2> /tmp/log.stderr & " +
"echo $! > " + store_socket_name_ + ".pid";
PLASMA_CHECK_SYSTEM(system(plasma_command.c_str()));
ARROW_CHECK_OK(client_.Connect(store_socket_name_, ""));
}
void TearDown() override {
ARROW_CHECK_OK(client_.Disconnect());
// Kill plasma_store process that we started
#ifdef COVERAGE_BUILD
// Ask plasma_store to exit gracefully and give it time to write out
// coverage files
std::string plasma_term_command =
"kill -TERM `cat " + store_socket_name_ + ".pid` || exit 0";
PLASMA_CHECK_SYSTEM(system(plasma_term_command.c_str()));
std::this_thread::sleep_for(std::chrono::milliseconds(200));
#endif
std::string plasma_kill_command =
"kill -KILL `cat " + store_socket_name_ + ".pid` || exit 0";
PLASMA_CHECK_SYSTEM(system(plasma_kill_command.c_str()));
}
protected:
PlasmaClient client_;
std::unique_ptr<TemporaryDir> temp_dir_;
std::string store_socket_name_;
};
TEST_F(TestPlasmaStoreWithExternal, EvictionTest) {
std::vector<ObjectID> object_ids;
std::string data(100 * 1024, 'x');
std::string metadata;
for (int i = 0; i < 20; i++) {
ObjectID object_id = random_object_id();
object_ids.push_back(object_id);
// Test for object non-existence.
bool has_object;
ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
ASSERT_FALSE(has_object);
// Test for the object being in local Plasma store.
// Create and seal the object.
ARROW_CHECK_OK(client_.CreateAndSeal(object_id, data, metadata));
// Test that the client can get the object.
ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
ASSERT_TRUE(has_object);
}
for (int i = 0; i < 20; i++) {
// Since we are accessing objects sequentially, every object we
// access would be a cache "miss" owing to LRU eviction.
// Try and access the object from the plasma store first, and then try
// external store on failure. This should succeed to fetch the object.
// However, it may evict the next few objects.
std::vector<ObjectBuffer> object_buffers;
ARROW_CHECK_OK(client_.Get({object_ids[i]}, -1, &object_buffers));
ASSERT_EQ(object_buffers.size(), 1);
ASSERT_EQ(object_buffers[0].device_num, 0);
ASSERT_TRUE(object_buffers[0].data);
AssertObjectBufferEqual(object_buffers[0], metadata, data);
}
// Make sure we still cannot fetch objects that do not exist
std::vector<ObjectBuffer> object_buffers;
ARROW_CHECK_OK(client_.Get({random_object_id()}, 100, &object_buffers));
ASSERT_EQ(object_buffers.size(), 1);
ASSERT_EQ(object_buffers[0].device_num, 0);
ASSERT_EQ(object_buffers[0].data, nullptr);
ASSERT_EQ(object_buffers[0].metadata, nullptr);
}
} // namespace plasma
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
plasma::external_test_executable = std::string(argv[0]);
return RUN_ALL_TESTS();
}
@@ -1,333 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <sstream>
#include <sys/types.h>
#include <unistd.h>
#include <gtest/gtest.h>
#include "arrow/testing/gtest_util.h"
#include "arrow/util/io_util.h"
#include "ray/object_manager/plasma/common.h"
#include "ray/object_manager/plasma/io.h"
#include "ray/object_manager/plasma/plasma.h"
#include "ray/object_manager/plasma/protocol.h"
#include "ray/object_manager/plasma/test_util.h"
namespace fb = plasma::flatbuf;
namespace plasma {
using arrow::internal::TemporaryDir;
/**
* Seek to the beginning of a file and read a message from it.
*
* \param fd File descriptor of the file.
* \param message_type Message type that we expect in the file.
*
* \return Pointer to the content of the message. Needs to be freed by the
* caller.
*/
std::vector<uint8_t> read_message_from_file(int fd, MessageType message_type) {
/* Go to the beginning of the file. */
lseek(fd, 0, SEEK_SET);
MessageType type;
std::vector<uint8_t> data;
Status s = ReadMessage(fd, &type, &data);
DCHECK_OK(s);
DCHECK_EQ(type, message_type);
return data;
}
PlasmaObject random_plasma_object(void) {
unsigned int seed = static_cast<unsigned int>(time(NULL));
int random = rand_r(&seed);
PlasmaObject object = {};
object.store_fd = random + 7;
object.data_offset = random + 1;
object.metadata_offset = random + 2;
object.data_size = random + 3;
object.metadata_size = random + 4;
object.device_num = 0;
return object;
}
class TestPlasmaSerialization : public ::testing::Test {
public:
void SetUp() { ASSERT_OK_AND_ASSIGN(temp_dir_, TemporaryDir::Make("ser-test-")); }
// Create a temporary file.
// A fd is returned which must be closed manually. The file itself
// is deleted at the end of the test.
int CreateTemporaryFile(void) {
char path[1024];
std::stringstream ss;
ss << temp_dir_->path().ToString() << "fileXXXXXX";
strncpy(path, ss.str().c_str(), sizeof(path));
ARROW_LOG(INFO) << "file path: '" << path << "'";
return mkstemp(path);
}
protected:
std::unique_ptr<TemporaryDir> temp_dir_;
};
TEST_F(TestPlasmaSerialization, CreateRequest) {
int fd = CreateTemporaryFile();
ObjectID object_id1 = random_object_id();
int64_t data_size1 = 42;
int64_t metadata_size1 = 11;
int device_num1 = 0;
ASSERT_OK(SendCreateRequest(fd, object_id1, /*evict_if_full=*/true, data_size1,
metadata_size1, device_num1));
std::vector<uint8_t> data =
read_message_from_file(fd, MessageType::PlasmaCreateRequest);
ObjectID object_id2;
bool evict_if_full;
int64_t data_size2;
int64_t metadata_size2;
int device_num2;
ASSERT_OK(ReadCreateRequest(data.data(), data.size(), &object_id2, &evict_if_full,
&data_size2, &metadata_size2, &device_num2));
ASSERT_TRUE(evict_if_full);
ASSERT_EQ(data_size1, data_size2);
ASSERT_EQ(metadata_size1, metadata_size2);
ASSERT_EQ(object_id1, object_id2);
ASSERT_EQ(device_num1, device_num2);
close(fd);
}
TEST_F(TestPlasmaSerialization, CreateReply) {
int fd = CreateTemporaryFile();
ObjectID object_id1 = random_object_id();
PlasmaObject object1 = random_plasma_object();
int64_t mmap_size1 = 1000000;
ASSERT_OK(SendCreateReply(fd, object_id1, &object1, PlasmaError::OK, mmap_size1));
std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaCreateReply);
ObjectID object_id2;
PlasmaObject object2 = {};
int store_fd;
int64_t mmap_size2;
ASSERT_OK(ReadCreateReply(data.data(), data.size(), &object_id2, &object2, &store_fd,
&mmap_size2));
ASSERT_EQ(object_id1, object_id2);
ASSERT_EQ(object1.store_fd, store_fd);
ASSERT_EQ(mmap_size1, mmap_size2);
ASSERT_EQ(memcmp(&object1, &object2, sizeof(object1)), 0);
close(fd);
}
TEST_F(TestPlasmaSerialization, SealRequest) {
int fd = CreateTemporaryFile();
ObjectID object_id1 = random_object_id();
std::string digest1 = std::string(kDigestSize, 7);
ASSERT_OK(SendSealRequest(fd, object_id1, digest1));
std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaSealRequest);
ObjectID object_id2;
std::string digest2;
ASSERT_OK(ReadSealRequest(data.data(), data.size(), &object_id2, &digest2));
ASSERT_EQ(object_id1, object_id2);
ASSERT_EQ(memcmp(digest1.data(), digest2.data(), kDigestSize), 0);
close(fd);
}
TEST_F(TestPlasmaSerialization, SealReply) {
int fd = CreateTemporaryFile();
ObjectID object_id1 = random_object_id();
ASSERT_OK(SendSealReply(fd, object_id1, PlasmaError::ObjectExists));
std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaSealReply);
ObjectID object_id2;
Status s = ReadSealReply(data.data(), data.size(), &object_id2);
ASSERT_EQ(object_id1, object_id2);
ASSERT_TRUE(IsPlasmaObjectExists(s));
close(fd);
}
TEST_F(TestPlasmaSerialization, GetRequest) {
int fd = CreateTemporaryFile();
ObjectID object_ids[2];
object_ids[0] = random_object_id();
object_ids[1] = random_object_id();
int64_t timeout_ms = 1234;
ASSERT_OK(SendGetRequest(fd, object_ids, 2, timeout_ms));
std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaGetRequest);
std::vector<ObjectID> object_ids_return;
int64_t timeout_ms_return;
ASSERT_OK(
ReadGetRequest(data.data(), data.size(), object_ids_return, &timeout_ms_return));
ASSERT_EQ(object_ids[0], object_ids_return[0]);
ASSERT_EQ(object_ids[1], object_ids_return[1]);
ASSERT_EQ(timeout_ms, timeout_ms_return);
close(fd);
}
TEST_F(TestPlasmaSerialization, GetReply) {
int fd = CreateTemporaryFile();
ObjectID object_ids[2];
object_ids[0] = random_object_id();
object_ids[1] = random_object_id();
std::unordered_map<ObjectID, PlasmaObject> plasma_objects;
plasma_objects[object_ids[0]] = random_plasma_object();
plasma_objects[object_ids[1]] = random_plasma_object();
std::vector<int> store_fds = {1, 2, 3};
std::vector<int64_t> mmap_sizes = {100, 200, 300};
ASSERT_OK(SendGetReply(fd, object_ids, plasma_objects, 2, store_fds, mmap_sizes));
std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaGetReply);
ObjectID object_ids_return[2];
PlasmaObject plasma_objects_return[2];
std::vector<int> store_fds_return;
std::vector<int64_t> mmap_sizes_return;
memset(&plasma_objects_return, 0, sizeof(plasma_objects_return));
ASSERT_OK(ReadGetReply(data.data(), data.size(), object_ids_return,
&plasma_objects_return[0], 2, store_fds_return,
mmap_sizes_return));
ASSERT_EQ(object_ids[0], object_ids_return[0]);
ASSERT_EQ(object_ids[1], object_ids_return[1]);
PlasmaObject po, po2;
for (int i = 0; i < 2; ++i) {
po = plasma_objects[object_ids[i]];
po2 = plasma_objects_return[i];
ASSERT_EQ(po, po2);
}
ASSERT_TRUE(store_fds == store_fds_return);
ASSERT_TRUE(mmap_sizes == mmap_sizes_return);
close(fd);
}
TEST_F(TestPlasmaSerialization, ReleaseRequest) {
int fd = CreateTemporaryFile();
ObjectID object_id1 = random_object_id();
ASSERT_OK(SendReleaseRequest(fd, object_id1));
std::vector<uint8_t> data =
read_message_from_file(fd, MessageType::PlasmaReleaseRequest);
ObjectID object_id2;
ASSERT_OK(ReadReleaseRequest(data.data(), data.size(), &object_id2));
ASSERT_EQ(object_id1, object_id2);
close(fd);
}
TEST_F(TestPlasmaSerialization, ReleaseReply) {
int fd = CreateTemporaryFile();
ObjectID object_id1 = random_object_id();
ASSERT_OK(SendReleaseReply(fd, object_id1, PlasmaError::ObjectExists));
std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaReleaseReply);
ObjectID object_id2;
Status s = ReadReleaseReply(data.data(), data.size(), &object_id2);
ASSERT_EQ(object_id1, object_id2);
ASSERT_TRUE(IsPlasmaObjectExists(s));
close(fd);
}
TEST_F(TestPlasmaSerialization, DeleteRequest) {
int fd = CreateTemporaryFile();
ObjectID object_id1 = random_object_id();
ASSERT_OK(SendDeleteRequest(fd, std::vector<ObjectID>{object_id1}));
std::vector<uint8_t> data =
read_message_from_file(fd, MessageType::PlasmaDeleteRequest);
std::vector<ObjectID> object_vec;
ASSERT_OK(ReadDeleteRequest(data.data(), data.size(), &object_vec));
ASSERT_EQ(object_vec.size(), 1);
ASSERT_EQ(object_id1, object_vec[0]);
close(fd);
}
TEST_F(TestPlasmaSerialization, DeleteReply) {
int fd = CreateTemporaryFile();
ObjectID object_id1 = random_object_id();
PlasmaError error1 = PlasmaError::ObjectExists;
ASSERT_OK(SendDeleteReply(fd, std::vector<ObjectID>{object_id1},
std::vector<PlasmaError>{error1}));
std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaDeleteReply);
std::vector<ObjectID> object_vec;
std::vector<PlasmaError> error_vec;
Status s = ReadDeleteReply(data.data(), data.size(), &object_vec, &error_vec);
ASSERT_EQ(object_vec.size(), 1);
ASSERT_EQ(object_id1, object_vec[0]);
ASSERT_EQ(error_vec.size(), 1);
ASSERT_TRUE(error_vec[0] == PlasmaError::ObjectExists);
ASSERT_TRUE(s.ok());
close(fd);
}
TEST_F(TestPlasmaSerialization, EvictRequest) {
int fd = CreateTemporaryFile();
int64_t num_bytes = 111;
ASSERT_OK(SendEvictRequest(fd, num_bytes));
std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaEvictRequest);
int64_t num_bytes_received;
ASSERT_OK(ReadEvictRequest(data.data(), data.size(), &num_bytes_received));
ASSERT_EQ(num_bytes, num_bytes_received);
close(fd);
}
TEST_F(TestPlasmaSerialization, EvictReply) {
int fd = CreateTemporaryFile();
int64_t num_bytes = 111;
ASSERT_OK(SendEvictReply(fd, num_bytes));
std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaEvictReply);
int64_t num_bytes_received;
ASSERT_OK(ReadEvictReply(data.data(), data.size(), num_bytes_received));
ASSERT_EQ(num_bytes, num_bytes_received);
close(fd);
}
TEST_F(TestPlasmaSerialization, DataRequest) {
int fd = CreateTemporaryFile();
ObjectID object_id1 = random_object_id();
const char* address1 = "address1";
int port1 = 12345;
ASSERT_OK(SendDataRequest(fd, object_id1, address1, port1));
/* Reading message back. */
std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaDataRequest);
ObjectID object_id2;
char* address2;
int port2;
ASSERT_OK(ReadDataRequest(data.data(), data.size(), &object_id2, &address2, &port2));
ASSERT_EQ(object_id1, object_id2);
ASSERT_EQ(strcmp(address1, address2), 0);
ASSERT_EQ(port1, port2);
free(address2);
close(fd);
}
TEST_F(TestPlasmaSerialization, DataReply) {
int fd = CreateTemporaryFile();
ObjectID object_id1 = random_object_id();
int64_t object_size1 = 146;
int64_t metadata_size1 = 198;
ASSERT_OK(SendDataReply(fd, object_id1, object_size1, metadata_size1));
/* Reading message back. */
std::vector<uint8_t> data = read_message_from_file(fd, MessageType::PlasmaDataReply);
ObjectID object_id2;
int64_t object_size2;
int64_t metadata_size2;
ASSERT_OK(ReadDataReply(data.data(), data.size(), &object_id2, &object_size2,
&metadata_size2));
ASSERT_EQ(object_id1, object_id2);
ASSERT_EQ(object_size1, object_size2);
ASSERT_EQ(metadata_size1, metadata_size2);
}
} // namespace plasma
-46
View File
@@ -1,46 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <algorithm>
#include <limits>
#include <random>
#include "ray/object_manager/plasma/common.h"
namespace plasma {
ObjectID random_object_id() {
static uint32_t random_seed = 0;
std::mt19937 gen(random_seed++);
std::uniform_int_distribution<uint32_t> d(0, std::numeric_limits<uint8_t>::max());
ObjectID result;
uint8_t* data = result.mutable_data();
std::generate(data, data + kUniqueIDSize,
[&d, &gen] { return static_cast<uint8_t>(d(gen)); });
return result;
}
#define PLASMA_CHECK_SYSTEM(expr) \
do { \
int status__ = (expr); \
EXPECT_TRUE(WIFEXITED(status__)); \
EXPECT_EQ(WEXITSTATUS(status__), 0); \
} while (false);
} // namespace plasma
+10 -9
View File
@@ -12,16 +12,20 @@
#define __STDC_FORMAT_MACROS
#endif
using arrow::util::ArrowLog;
void HandleSignal(int signal) {
if (signal == SIGTERM) {
ARROW_LOG(INFO) << "SIGTERM Signal received, closing Plasma Server...";
RAY_LOG(INFO) << "SIGTERM Signal received, closing Plasma Server...";
plasma::plasma_store_runner->Stop();
}
}
int main(int argc, char *argv[]) {
InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog,
ray::RayLog::ShutDownRayLog, argv[0],
ray::RayLogLevel::INFO,
/*log_dir=*/"");
ray::RayLog::InstallFailureSignalHandler();
std::string socket_name;
// Directory where plasma memory mapped files are stored.
std::string plasma_directory;
@@ -47,7 +51,7 @@ int main(int argc, char *argv[]) {
case 'm': {
char extra;
int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra);
ARROW_CHECK(scanned == 1);
RAY_CHECK(scanned == 1);
break;
}
case 'z': {
@@ -60,7 +64,6 @@ int main(int argc, char *argv[]) {
}
if (!keep_idle) {
ArrowLog::InstallFailureSignalHandler();
plasma::plasma_store_runner.reset(
new plasma::PlasmaStoreRunner(socket_name, system_memory, hugepages_enabled,
plasma_directory, external_store_endpoint));
@@ -73,11 +76,9 @@ int main(int argc, char *argv[]) {
signal(SIGTERM, HandleSignal);
plasma::plasma_store_runner->Start();
plasma::plasma_store_runner.reset();
ArrowLog::UninstallSignalAction();
} else {
printf(
"The Plasma Store is started with the '-z' flag, "
"and it will run idle as a placeholder.");
RAY_LOG(INFO) << "The Plasma Store is started with the '-z' flag, "
<< "and it will run idle as a placeholder.";
while (true) {
std::this_thread::sleep_for(std::chrono::hours(1000));
}