mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 10:01:50 +08:00
Update arrow and parquet-cpp. (#1875)
* Update arrow. * Fix bug. * Cherry-pick commit for fixing parquet segfault. * Update arrow and revert auto-releasing buffer commit. * Remove parquet cherry-pick.
This commit is contained in:
committed by
Philipp Moritz
parent
39cf6ff6e1
commit
d0fffec2d0
@@ -802,7 +802,7 @@ void process_transfer_request(event_loop *loop,
|
||||
/* We pass in 0 to indicate that the command should return immediately. */
|
||||
ARROW_CHECK_OK(
|
||||
conn->manager_state->plasma_conn->Get(&object_id, 1, 0, &object_buffer));
|
||||
if (object_buffer.data_size == -1) {
|
||||
if (object_buffer.data == nullptr) {
|
||||
/* If the object wasn't locally available, exit immediately. If the object
|
||||
* later appears locally, the requesting plasma manager should request the
|
||||
* transfer again. */
|
||||
@@ -823,15 +823,15 @@ void process_transfer_request(event_loop *loop,
|
||||
}
|
||||
|
||||
RAY_CHECK(object_buffer.metadata->data() ==
|
||||
object_buffer.data->data() + object_buffer.data_size);
|
||||
object_buffer.data->data() + object_buffer.data->size());
|
||||
PlasmaRequestBuffer *buf = new PlasmaRequestBuffer();
|
||||
buf->type = MessageType_PlasmaDataReply;
|
||||
buf->object_id = obj_id;
|
||||
/* We treat buf->data as a pointer to the concatenated data and metadata, so
|
||||
* we don't actually use buf->metadata. */
|
||||
buf->data = const_cast<uint8_t *>(object_buffer.data->data());
|
||||
buf->data_size = object_buffer.data_size;
|
||||
buf->metadata_size = object_buffer.metadata_size;
|
||||
buf->data_size = object_buffer.data->size();
|
||||
buf->metadata_size = object_buffer.metadata->size();
|
||||
|
||||
manager_conn->transfer_queue.push_back(buf);
|
||||
manager_conn->pending_object_transfers[object_id] = buf;
|
||||
|
||||
@@ -137,7 +137,7 @@ TEST plasma_nonblocking_get_tests(void) {
|
||||
|
||||
/* Test for object non-existence. */
|
||||
ARROW_CHECK_OK(client.Get(oid_array, 1, 0, &obj_buffer));
|
||||
ASSERT(obj_buffer.data_size == -1);
|
||||
ASSERT(obj_buffer.data == nullptr);
|
||||
|
||||
/* Test for the object being in local Plasma store. */
|
||||
/* First create object. */
|
||||
@@ -240,7 +240,7 @@ TEST plasma_get_tests(void) {
|
||||
PLASMA_DEFAULT_RELEASE_DELAY));
|
||||
ObjectID oid1 = ObjectID::from_random();
|
||||
ObjectID oid2 = ObjectID::from_random();
|
||||
ObjectBuffer obj_buffer;
|
||||
ObjectBuffer obj_buffer1;
|
||||
|
||||
ObjectID oid_array1[1] = {oid1};
|
||||
ObjectID oid_array2[1] = {oid2};
|
||||
@@ -254,17 +254,18 @@ TEST plasma_get_tests(void) {
|
||||
init_data_123(data->mutable_data(), data_size, 1);
|
||||
ARROW_CHECK_OK(client1.Seal(oid1));
|
||||
|
||||
ARROW_CHECK_OK(client1.Get(oid_array1, 1, -1, &obj_buffer));
|
||||
ASSERT(data->data()[0] == obj_buffer.data->data()[0]);
|
||||
ARROW_CHECK_OK(client1.Get(oid_array1, 1, -1, &obj_buffer1));
|
||||
ASSERT(data->data()[0] == obj_buffer1.data->data()[0]);
|
||||
|
||||
ObjectBuffer obj_buffer2;
|
||||
ARROW_CHECK_OK(
|
||||
client2.Create(oid2, data_size, metadata, metadata_size, &data));
|
||||
init_data_123(data->mutable_data(), data_size, 2);
|
||||
ARROW_CHECK_OK(client2.Seal(oid2));
|
||||
|
||||
ARROW_CHECK_OK(client1.Fetch(1, oid_array2));
|
||||
ARROW_CHECK_OK(client1.Get(oid_array2, 1, -1, &obj_buffer));
|
||||
ASSERT(data->data()[0] == obj_buffer.data->data()[0]);
|
||||
ARROW_CHECK_OK(client1.Get(oid_array2, 1, -1, &obj_buffer2));
|
||||
ASSERT(data->data()[0] == obj_buffer2.data->data()[0]);
|
||||
|
||||
sleep(1);
|
||||
ARROW_CHECK_OK(client1.Disconnect());
|
||||
|
||||
@@ -300,7 +300,7 @@ ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id_const,
|
||||
plasma::ObjectID plasma_id = object_id.to_plasma_id();
|
||||
std::shared_ptr<plasma::PlasmaClient> store_client = store_pool_.GetObjectStore();
|
||||
ARROW_CHECK_OK(store_client->Get(&plasma_id, 1, 0, &object_buffer));
|
||||
if (object_buffer.data_size == -1) {
|
||||
if (object_buffer.data == nullptr) {
|
||||
RAY_LOG(ERROR) << "Failed to get object";
|
||||
// If the object wasn't locally available, exit immediately. If the object
|
||||
// later appears locally, the requesting plasma manager should request the
|
||||
@@ -311,12 +311,12 @@ ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id_const,
|
||||
"Unable to transfer object to requesting plasma manager, object not local.");
|
||||
}
|
||||
RAY_CHECK(object_buffer.metadata->data() ==
|
||||
object_buffer.data->data() + object_buffer.data_size);
|
||||
object_buffer.data->data() + object_buffer.data->size());
|
||||
|
||||
TransferQueue::SendContext context;
|
||||
context.client_id = conn->GetClientID();
|
||||
context.object_id = object_id;
|
||||
context.object_size = static_cast<uint64_t>(object_buffer.data_size);
|
||||
context.object_size = static_cast<uint64_t>(object_buffer.data->size());
|
||||
context.data = const_cast<uint8_t *>(object_buffer.data->data());
|
||||
UniqueID context_id = transfer_queue_.AddContext(context);
|
||||
|
||||
|
||||
@@ -286,8 +286,8 @@ class StressTestObjectManager : public TestObjectManagerBase {
|
||||
plasma::ObjectBuffer object_buffer_2 = GetObject(client1, object_id_1);
|
||||
uint8_t *data_1 = const_cast<uint8_t *>(object_buffer_1.data->data());
|
||||
uint8_t *data_2 = const_cast<uint8_t *>(object_buffer_2.data->data());
|
||||
ASSERT_EQ(object_buffer_1.data_size, object_buffer_2.data_size);
|
||||
for (int i = -1; ++i < object_buffer_1.data_size;) {
|
||||
ASSERT_EQ(object_buffer_1.data->size(), object_buffer_2.data->size());
|
||||
for (int i = -1; ++i < object_buffer_1.data->size();) {
|
||||
ASSERT_TRUE(data_1[i] == data_2[i]);
|
||||
}
|
||||
}
|
||||
|
||||
Vendored
+6
-2
@@ -44,10 +44,14 @@ if [[ ! -d $TP_DIR/../python/ray/pyarrow_files/pyarrow ]]; then
|
||||
|
||||
pushd $TP_DIR/build/arrow
|
||||
git fetch origin master
|
||||
# The PR for this commit is https://github.com/apache/arrow/pull/1581. We
|
||||
# The PR for this commit is https://github.com/apache/arrow/pull/1880. We
|
||||
# include the link here to make it easier to find the right commit because
|
||||
# Arrow often rewrites git history and invalidates certain commits.
|
||||
git checkout 46aa99e9843ac0148357bb36a9235cfd48903e73
|
||||
git checkout 4009b62086dfa43a4fd8bfa714772716e6531c6f
|
||||
|
||||
# Revert https://github.com/apache/arrow/pull/1807, which unfortunately
|
||||
# introduces the issue in https://issues.apache.org/jira/browse/ARROW-2448.
|
||||
git revert --no-commit cf396867df6f1f93948c69ce10ceb0f95e399242
|
||||
|
||||
cd cpp
|
||||
if [ ! -d "build" ]; then
|
||||
|
||||
Vendored
+1
-1
@@ -15,7 +15,7 @@ if [ ! -d $TP_DIR/build/parquet-cpp ]; then
|
||||
git clone https://github.com/apache/parquet-cpp.git "$TP_DIR/build/parquet-cpp"
|
||||
pushd $TP_DIR/build/parquet-cpp
|
||||
git fetch origin master
|
||||
git checkout 76388ea4eb8b23656283116bc656b0c8f5db093b
|
||||
git checkout 0875e43010af485e1c0b506d77d7e0edc80c66cc
|
||||
|
||||
if [ "$unamestr" == "Darwin" ]; then
|
||||
OPENSSL_ROOT_DIR=$OPENSSL_DIR \
|
||||
|
||||
Reference in New Issue
Block a user