diff --git a/python/ray/dataframe/__init__.py b/python/ray/dataframe/__init__.py index 3e3192735..8f315be6a 100644 --- a/python/ray/dataframe/__init__.py +++ b/python/ray/dataframe/__init__.py @@ -6,10 +6,5 @@ from .dataframe import DataFrame from .dataframe import from_pandas from .dataframe import to_pandas from .series import Series -import ray -import pandas as pd __all__ = ["DataFrame", "from_pandas", "to_pandas", "Series"] - -ray.register_custom_serializer(pd.DataFrame, use_pickle=True) -ray.register_custom_serializer(pd.core.indexes.base.Index, use_pickle=True) diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 6cabbcfb5..030016da1 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -392,7 +392,7 @@ void finish_killed_task(LocalSchedulerState *state, int64_t num_returns = TaskSpec_num_returns(spec); for (int i = 0; i < num_returns; i++) { ObjectID object_id = TaskSpec_return(spec, i); - uint8_t *data = NULL; + std::shared_ptr data; // TODO(ekl): this writes an invalid arrow object, which is sufficient to // signal that the worker failed, but it would be nice to return more // detailed failure metadata in the future. diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index e7e648dbd..aeccfa5fd 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -798,14 +798,14 @@ void process_transfer_request(event_loop *loop, } } - DCHECK(object_buffer.metadata == - object_buffer.data + object_buffer.data_size); + CHECK(object_buffer.metadata->data() == + object_buffer.data->data() + object_buffer.data_size); PlasmaRequestBuffer *buf = new PlasmaRequestBuffer(); buf->type = MessageType_PlasmaDataReply; buf->object_id = obj_id; /* We treat buf->data as a pointer to the concatenated data and metadata, so * we don't actually use buf->metadata. */ - buf->data = object_buffer.data; + buf->data = const_cast(object_buffer.data->data()); buf->data_size = object_buffer.data_size; buf->metadata_size = object_buffer.metadata_size; @@ -839,8 +839,10 @@ void process_data_request(event_loop *loop, /* The corresponding call to plasma_release should happen in * process_data_chunk. */ + std::shared_ptr data; Status s = conn->manager_state->plasma_conn->Create( - object_id.to_plasma_id(), data_size, NULL, metadata_size, &(buf->data)); + object_id.to_plasma_id(), data_size, NULL, metadata_size, &data); + /* If success_create == true, a new object has been created. * If success_create == false the object creation has failed, possibly * due to an object with the same ID already existing in the Plasma Store. */ @@ -857,6 +859,7 @@ void process_data_request(event_loop *loop, event_loop_remove_file(loop, client_sock); event_loop_file_handler data_chunk_handler; if (s.ok()) { + buf->data = data->mutable_data(); data_chunk_handler = process_data_chunk; } else { /* Since plasma_create() has failed, we ignore the data transfer. We will diff --git a/src/plasma/test/client_tests.cc b/src/plasma/test/client_tests.cc index 39e25828d..6879b8d4a 100644 --- a/src/plasma/test/client_tests.cc +++ b/src/plasma/test/client_tests.cc @@ -32,7 +32,7 @@ TEST plasma_status_tests(void) { int64_t data_size = 100; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); - uint8_t *data; + std::shared_ptr data; ARROW_CHECK_OK( client1.Create(oid1, data_size, metadata, metadata_size, &data)); ARROW_CHECK_OK(client1.Seal(oid1)); @@ -73,7 +73,7 @@ TEST plasma_fetch_tests(void) { int64_t data_size = 100; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); - uint8_t *data; + std::shared_ptr data; ARROW_CHECK_OK( client1.Create(oid1, data_size, metadata, metadata_size, &data)); ARROW_CHECK_OK(client1.Seal(oid1)); @@ -116,7 +116,9 @@ void init_data_123(uint8_t *data, uint64_t size, uint8_t base) { } } -bool is_equal_data_123(uint8_t *data1, uint8_t *data2, uint64_t size) { +bool is_equal_data_123(const uint8_t *data1, + const uint8_t *data2, + uint64_t size) { for (size_t i = 0; i < size; i++) { if (data1[i] != data2[i]) { return false; @@ -142,14 +144,15 @@ TEST plasma_nonblocking_get_tests(void) { int64_t data_size = 4; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); - uint8_t *data; + std::shared_ptr data; ARROW_CHECK_OK(client.Create(oid, data_size, metadata, metadata_size, &data)); - init_data_123(data, data_size, 0); + init_data_123(data->mutable_data(), data_size, 0); ARROW_CHECK_OK(client.Seal(oid)); sleep(1); ARROW_CHECK_OK(client.Get(oid_array, 1, 0, &obj_buffer)); - ASSERT(is_equal_data_123(data, obj_buffer.data, data_size) == true); + ASSERT(is_equal_data_123(data->data(), obj_buffer.data->data(), data_size) == + true); sleep(1); ARROW_CHECK_OK(client.Disconnect()); @@ -191,7 +194,7 @@ TEST plasma_wait_for_objects_tests(void) { int64_t data_size = 4; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); - uint8_t *data; + std::shared_ptr data; ARROW_CHECK_OK( client1.Create(oid1, data_size, metadata, metadata_size, &data)); ARROW_CHECK_OK(client1.Seal(oid1)); @@ -245,23 +248,23 @@ TEST plasma_get_tests(void) { int64_t data_size = 4; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); - uint8_t *data; + std::shared_ptr data; ARROW_CHECK_OK( client1.Create(oid1, data_size, metadata, metadata_size, &data)); - init_data_123(data, data_size, 1); + init_data_123(data->mutable_data(), data_size, 1); ARROW_CHECK_OK(client1.Seal(oid1)); ARROW_CHECK_OK(client1.Get(oid_array1, 1, -1, &obj_buffer)); - ASSERT(data[0] == obj_buffer.data[0]); + ASSERT(data->data()[0] == obj_buffer.data->data()[0]); ARROW_CHECK_OK( client2.Create(oid2, data_size, metadata, metadata_size, &data)); - init_data_123(data, data_size, 2); + init_data_123(data->mutable_data(), data_size, 2); ARROW_CHECK_OK(client2.Seal(oid2)); ARROW_CHECK_OK(client1.Fetch(1, oid_array2)); ARROW_CHECK_OK(client1.Get(oid_array2, 1, -1, &obj_buffer)); - ASSERT(data[0] == obj_buffer.data[0]); + ASSERT(data->data()[0] == obj_buffer.data->data()[0]); sleep(1); ARROW_CHECK_OK(client1.Disconnect()); @@ -288,25 +291,25 @@ TEST plasma_get_multiple_tests(void) { int64_t data_size = 4; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); - uint8_t *data; + std::shared_ptr data; ARROW_CHECK_OK( client1.Create(oid1, data_size, metadata, metadata_size, &data)); - init_data_123(data, data_size, obj1_first); + init_data_123(data->mutable_data(), data_size, obj1_first); ARROW_CHECK_OK(client1.Seal(oid1)); /* This only waits for oid1. */ ARROW_CHECK_OK(client1.Get(obj_ids, 1, -1, obj_buffer)); - ASSERT(data[0] == obj_buffer[0].data[0]); + ASSERT(data->data()[0] == obj_buffer[0].data->data()[0]); ARROW_CHECK_OK( client2.Create(oid2, data_size, metadata, metadata_size, &data)); - init_data_123(data, data_size, obj2_first); + init_data_123(data->mutable_data(), data_size, obj2_first); ARROW_CHECK_OK(client2.Seal(oid2)); ARROW_CHECK_OK(client1.Fetch(2, obj_ids)); ARROW_CHECK_OK(client1.Get(obj_ids, 2, -1, obj_buffer)); - ASSERT(obj1_first == obj_buffer[0].data[0]); - ASSERT(obj2_first == obj_buffer[1].data[0]); + ASSERT(obj1_first == obj_buffer[0].data->data()[0]); + ASSERT(obj2_first == obj_buffer[1].data->data()[0]); sleep(1); ARROW_CHECK_OK(client1.Disconnect()); diff --git a/src/thirdparty/download_thirdparty.sh b/src/thirdparty/download_thirdparty.sh index 9772e9709..b9f12142f 100755 --- a/src/thirdparty/download_thirdparty.sh +++ b/src/thirdparty/download_thirdparty.sh @@ -13,4 +13,4 @@ fi cd $TP_DIR/arrow git fetch origin master -git checkout 16c79cc94e2440321bcad1ebbef53ea1266b94e8 +git checkout d135974a0d3dd9a9fbbb10da4c5dbc65f9324234