diff --git a/CMakeLists.txt b/CMakeLists.txt index 258226855..ce05bd9c3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -77,7 +77,9 @@ if (UNIX AND NOT APPLE) endif() add_executable(objstore src/objstore.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) +target_link_libraries(objstore arrow) add_executable(scheduler src/scheduler.cc ${GENERATED_PROTOBUF_FILES}) -add_library(orchpylib SHARED src/orchpylib.cc src/worker.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) +add_library(orchpylib SHARED src/orchpylib.cc src/worker.cc src/ipc.cc src/serialize.cc ${GENERATED_PROTOBUF_FILES}) +target_link_libraries(orchpylib arrow) install(TARGETS objstore scheduler orchpylib DESTINATION ${CMAKE_SOURCE_DIR}/lib/orchpy/orchpy) diff --git a/include/orchestra/orchestra.h b/include/orchestra/orchestra.h index 65688d93e..cbd180037 100644 --- a/include/orchestra/orchestra.h +++ b/include/orchestra/orchestra.h @@ -55,7 +55,7 @@ public: }; struct slice { - char* data; + uint8_t* data; size_t len; }; diff --git a/lib/orchpy/orchpy/worker.py b/lib/orchpy/orchpy/worker.py index e20a6db7d..e71625796 100644 --- a/lib/orchpy/orchpy/worker.py +++ b/lib/orchpy/orchpy/worker.py @@ -1,5 +1,6 @@ from types import ModuleType import typing +import numpy as np import orchpy import serialization @@ -14,13 +15,19 @@ class Worker(object): def put_object(self, objref, value): """Put `value` in the local object store with objref `objref`. This assumes that the value for `objref` has not yet been placed in the local object store.""" - object_capsule = serialization.serialize(value) - orchpy.lib.put_object(self.handle, objref, object_capsule) + if type(value) == np.ndarray: + orchpy.lib.put_arrow(self.handle, objref, value) + else: + object_capsule = serialization.serialize(value) + orchpy.lib.put_object(self.handle, objref, object_capsule) def get_object(self, objref): """Return the value from the local object store for objref `objref`. This will block until the value for `objref` has been written to the local object store.""" - object_capsule = orchpy.lib.get_object(self.handle, objref) - return serialization.deserialize(object_capsule) + if orchpy.lib.is_arrow(self.handle, objref): + return orchpy.lib.get_arrow(self.handle, objref) + else: + object_capsule = orchpy.lib.get_object(self.handle, objref) + return serialization.deserialize(object_capsule) def register_function(self, function): """Notify the scheduler that this worker can execute the function with name `func_name`. Store the function `function` locally.""" @@ -53,12 +60,13 @@ def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker): worker.connected = True def pull(objref, worker=global_worker): - object_capsule = orchpy.lib.pull_object(worker.handle, objref) - return serialization.deserialize(object_capsule) + orchpy.lib.request_object(worker.handle, objref) + return worker.get_object(objref) def push(value, worker=global_worker): - object_capsule = serialization.serialize(value) - return orchpy.lib.push_object(worker.handle, object_capsule) + objref = orchpy.lib.get_objref(worker.handle) + worker.put_object(objref, value) + return objref def main_loop(worker=global_worker): if not worker.connected: diff --git a/protos/orchestra.proto b/protos/orchestra.proto index 35cb79a45..6eb7d90c0 100644 --- a/protos/orchestra.proto +++ b/protos/orchestra.proto @@ -32,7 +32,7 @@ service Scheduler { // Request an object reference for an object that will be pushed to an object store rpc PushObj(PushObjRequest) returns (PushObjReply); // Request delivery of an object from an object store that holds the object to the local object store - rpc PullObj(PullObjRequest) returns (AckReply); + rpc RequestObj(RequestObjRequest) returns (AckReply); // Used by an object store to tell the scheduler that an object is ready (i.e. has been finalized and can be shared) rpc ObjReady(ObjReadyRequest) returns (AckReply); // Used by the worker to report back and ask for more work @@ -75,9 +75,9 @@ message RemoteCallReply { repeated uint64 result = 1; // Object references of the function return values } -message PullObjRequest { - uint64 workerid = 1; // Worker that tries to pull the object - uint64 objref = 2; // Object reference of the object being pulled +message RequestObjRequest { + uint64 workerid = 1; // Worker that tries to request the object + uint64 objref = 2; // Object reference of the object being requested } message PushObjRequest { diff --git a/src/ipc.cc b/src/ipc.cc index 159f6919a..193698aa0 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -1,9 +1,31 @@ #include "ipc.h" -ObjHandle::ObjHandle(SegmentId segmentid, size_t size, IpcPointer ipcpointer) - : segmentid_(segmentid), size_(size), ipcpointer_(ipcpointer) +using namespace arrow; + +ObjHandle::ObjHandle(SegmentId segmentid, size_t size, IpcPointer ipcpointer, size_t metadata_offset) + : segmentid_(segmentid), size_(size), ipcpointer_(ipcpointer), metadata_offset_(metadata_offset) {} +Status BufferMemorySource::Write(int64_t position, const uint8_t* data, int64_t nbytes) { + // TODO(pcm): error handling + std::memcpy(data_ + position, data, nbytes); + return Status::OK(); +} + +Status BufferMemorySource::ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) { + // TODO(pcm): error handling + *out = std::make_shared(data_ + position, nbytes); + return Status::OK(); +} + +Status BufferMemorySource::Close() { + return Status::OK(); +} + +int64_t BufferMemorySource::Size() const { + return size_; +} + MemorySegmentPool::MemorySegmentPool(bool create) : create_mode_(create) { } // creates a memory segment if it is not already there; if the pool is in create mode, @@ -37,14 +59,14 @@ ObjHandle MemorySegmentPool::allocate(size_t size) { // returns address of the object refered to by the handle, needs to be called on // the process that will use the address -char* MemorySegmentPool::get_address(ObjHandle pointer) { +uint8_t* MemorySegmentPool::get_address(ObjHandle pointer) { if (pointer.segmentid() >= segments_.size()) { for (int i = segments_.size(); i <= pointer.segmentid(); ++i) { open_segment(i); } } managed_shared_memory* segment = segments_[pointer.segmentid()].get(); - return static_cast(segment->get_address_from_handle(pointer.ipcpointer())); + return static_cast(segment->get_address_from_handle(pointer.ipcpointer())); } MemorySegmentPool::~MemorySegmentPool() { diff --git a/src/ipc.h b/src/ipc.h index 9e2e764f2..7066793f3 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -6,6 +6,9 @@ #include #include +#include +#include + #include "orchestra/orchestra.h" using namespace boost::interprocess; @@ -91,6 +94,7 @@ struct ObjRequest { ObjRequestType type; // do we want to allocate a new object or get a handle? ObjRef objref; // object reference of the object to be returned/allocated int64_t size; // if allocate, that's the size of the object + int64_t metadata_offset; // if sending 'DONE', that's the location of the metadata relative to the beginning of the object }; typedef size_t SegmentId; // index into a memory segment table @@ -101,14 +105,30 @@ typedef managed_shared_memory::handle_t IpcPointer; class ObjHandle { public: - ObjHandle(SegmentId segmentid = 0, size_t size = 0, IpcPointer ipcpointer = IpcPointer()); + ObjHandle(SegmentId segmentid = 0, size_t size = 0, IpcPointer ipcpointer = IpcPointer(), size_t metadata_offset = 0); SegmentId segmentid() { return segmentid_; } size_t size() { return size_; } IpcPointer ipcpointer() { return ipcpointer_; } + size_t metadata_offset() { return metadata_offset_; } + void set_metadata_offset(size_t metadata_offset) {metadata_offset_ = metadata_offset; } private: - SegmentId segmentid_; - size_t size_; - IpcPointer ipcpointer_; + SegmentId segmentid_; // which shared memory file the object is stored in + IpcPointer ipcpointer_; // pointer to the beginning of the object, exchangeable between processes + size_t size_; // total size of the object + size_t metadata_offset_; // offset of the metadata that describes this object +}; + +class BufferMemorySource: public arrow::ipc::MemorySource { +public: + BufferMemorySource(uint8_t* data, int64_t capacity) : data_(data), capacity_(capacity), size_(0) {} + virtual arrow::Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out); + virtual arrow::Status Close(); + virtual arrow::Status Write(int64_t position, const uint8_t* data, int64_t nbytes); + virtual int64_t Size() const; + private: + uint8_t* data_; + int64_t capacity_; + int64_t size_; }; // Memory segment pool: A collection of shared memory segments @@ -123,8 +143,8 @@ class MemorySegmentPool { public: MemorySegmentPool(bool create = false); // can be used in two modes: create mode and open mode (see above) ~MemorySegmentPool(); - ObjHandle allocate(size_t nbytes); // allocate a new shared object, potentially creating a new segment (only run on object store) - char* get_address(ObjHandle pointer); // get address of shared object + ObjHandle allocate(size_t nbytes); // allocate memory, potentially creating a new segment (only run on object store) + uint8_t* get_address(ObjHandle pointer); // get address of shared object private: void open_segment(SegmentId segmentid, size_t size = 0); // create a segment or map an existing one into memory bool create_mode_; diff --git a/src/objstore.cc b/src/objstore.cc index 429fa45cb..2ddaa9a10 100644 --- a/src/objstore.cc +++ b/src/objstore.cc @@ -9,7 +9,7 @@ Status ObjStoreClient::upload_data_to(slice data, ObjRef objref, ObjStore::Stub& ClientContext context; AckReply reply; std::unique_ptr > writer(stub.StreamObj(&context, &reply)); - const char* head = data.data; + const uint8_t* head = data.data; for (size_t i = 0; i < data.len; i += CHUNK_SIZE) { chunk.set_objref(objref); chunk.set_totalsize(data.len); @@ -150,6 +150,7 @@ void ObjStoreService::process_requests() { break; case ObjRequestType::DONE: { std::pair& item = memory_[request.objref]; + item.first.set_metadata_offset(request.metadata_offset); item.second = true; std::lock_guard pull_queue_lock(pull_queue_lock_); for (size_t i = 0; i < pull_queue_.size(); ++i) { diff --git a/src/orchpylib.cc b/src/orchpylib.cc index 293e1fb79..1fc194c2a 100644 --- a/src/orchpylib.cc +++ b/src/orchpylib.cc @@ -4,12 +4,16 @@ #include #include +#define PY_ARRAY_UNIQUE_SYMBOL ORCHESTRA_ARRAY_API #include +#include #include #include "types.pb.h" #include "worker.h" +#include "serialize.h" + extern "C" { // Object references @@ -230,6 +234,13 @@ int serialize(PyObject* val, Obj* obj) { } } break; + case NPY_INT64: { + npy_int64* buffer = (npy_int64*) PyArray_DATA(array); + for (npy_intp i = 0; i < size; ++i) { + data->add_int_data(buffer[i]); + } + } + break; case NPY_UINT8: { npy_uint8* buffer = (npy_uint8*) PyArray_DATA(array); for (npy_intp i = 0; i < size; ++i) { @@ -237,6 +248,13 @@ int serialize(PyObject* val, Obj* obj) { } } break; + case NPY_UINT64: { + npy_uint64* buffer = (npy_uint64*) PyArray_DATA(array); + for (npy_intp i = 0; i < size; ++i) { + data->add_uint_data(buffer[i]); + } + } + break; case NPY_OBJECT: { // FIXME(pcm): Support arbitrary python objects, not only objrefs PyArrayIterObject* iter = (PyArrayIterObject*) PyArray_IterNew((PyObject*)array); while (PyArray_ITER_NOTDONE(iter)) { @@ -327,6 +345,13 @@ PyObject* deserialize(const Obj& obj) { } } break; + case NPY_INT64: { + npy_int64* buffer = (npy_int64*) PyArray_DATA(pyarray); + for (npy_intp i = 0; i < size; ++i) { + buffer[i] = array.int_data(i); + } + } + break; default: PyErr_SetString(OrchPyError, "deserialization: internal error (array type not implemented)"); return NULL; @@ -341,6 +366,13 @@ PyObject* deserialize(const Obj& obj) { } } break; + case NPY_UINT64: { + npy_uint64* buffer = (npy_uint64*) PyArray_DATA(pyarray); + for (npy_intp i = 0; i < size; ++i) { + buffer[i] = array.uint_data(i); + } + } + break; default: PyErr_SetString(OrchPyError, "deserialization: internal error (array type not implemented)"); return NULL; @@ -374,6 +406,43 @@ PyObject* serialize_object(PyObject* self, PyObject* args) { return PyCapsule_New(static_cast(obj), "obj", NULL); } +PyObject* put_arrow(PyObject* self, PyObject* args) { + Worker* worker; + ObjRef objref; + PyObject* value; + if (!PyArg_ParseTuple(args, "O&O&O", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref, &value)) { + return NULL; + } + if (!PyArray_Check(value)) { + PyErr_SetString(PyExc_TypeError, "only support arrays at this point"); + return NULL; + } + PyArrayObject* array = PyArray_GETCONTIGUOUS((PyArrayObject*) value); + worker->put_arrow(objref, array); + Py_RETURN_NONE; +} + +PyObject* get_arrow(PyObject* self, PyObject* args) { + Worker* worker; + ObjRef objref; + if (!PyArg_ParseTuple(args, "O&O&", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref)) { + return NULL; + } + return (PyObject*) worker->get_arrow(objref); +} + +PyObject* is_arrow(PyObject* self, PyObject* args) { + Worker* worker; + ObjRef objref; + if (!PyArg_ParseTuple(args, "O&O&", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref)) { + return NULL; + } + if (worker->is_arrow(objref)) + Py_RETURN_TRUE; + else + Py_RETURN_FALSE; +} + PyObject* deserialize_object(PyObject* self, PyObject* args) { Obj* obj; if (!PyArg_ParseTuple(args, "O&", &PyObjectToObj, &obj)) { @@ -496,13 +565,12 @@ PyObject* register_function(PyObject* self, PyObject* args) { Py_RETURN_NONE; } -PyObject* push_object(PyObject* self, PyObject* args) { +PyObject* get_objref(PyObject* self, PyObject* args) { Worker* worker; - Obj* obj; - if (!PyArg_ParseTuple(args, "O&O&", &PyObjectToWorker, &worker, &PyObjectToObj, &obj)) { + if (!PyArg_ParseTuple(args, "O&", &PyObjectToWorker, &worker)) { return NULL; } - ObjRef objref = worker->push_object(obj); + ObjRef objref = worker->get_objref(); return make_pyobjref(objref); } @@ -525,20 +593,18 @@ PyObject* get_object(PyObject* self, PyObject* args) { } slice s = worker->get_object(objref); Obj* obj = new Obj(); // TODO: Make sure this will get deleted - obj->ParseFromString(std::string(s.data, s.len)); + obj->ParseFromString(std::string(reinterpret_cast(s.data), s.len)); return PyCapsule_New(static_cast(obj), "obj", NULL); } -PyObject* pull_object(PyObject* self, PyObject* args) { +PyObject* request_object(PyObject* self, PyObject* args) { Worker* worker; ObjRef objref; if (!PyArg_ParseTuple(args, "O&O&", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref)) { return NULL; } - slice s = worker->pull_object(objref); - Obj* obj = new Obj(); // TODO: Make sure this will get deleted - obj->ParseFromString(std::string(s.data, s.len)); - return PyCapsule_New(static_cast(obj), "obj", NULL); + worker->request_object(objref); + Py_RETURN_NONE; } PyObject* start_worker_service(PyObject* self, PyObject* args) { @@ -553,14 +619,17 @@ PyObject* start_worker_service(PyObject* self, PyObject* args) { static PyMethodDef OrchPyLibMethods[] = { { "serialize_object", serialize_object, METH_VARARGS, "serialize an object to protocol buffers" }, { "deserialize_object", deserialize_object, METH_VARARGS, "deserialize an object from protocol buffers" }, + { "put_arrow", put_arrow, METH_VARARGS, "put an arrow array on the local object store"}, + { "get_arrow", get_arrow, METH_VARARGS, "get an arrow array from the local object store"}, + { "is_arrow", is_arrow, METH_VARARGS, "is the object in the local object store an arrow object?"}, { "serialize_call", serialize_call, METH_VARARGS, "serialize a call to protocol buffers" }, { "deserialize_call", deserialize_call, METH_VARARGS, "deserialize a call from protocol buffers" }, { "create_worker", create_worker, METH_VARARGS, "connect to the scheduler and the object store" }, { "register_function", register_function, METH_VARARGS, "register a function with the scheduler" }, { "put_object", put_object, METH_VARARGS, "put a protocol buffer object (given as a capsule) on the local object store" }, { "get_object", get_object, METH_VARARGS, "get protocol buffer object from the local object store" }, - { "push_object", push_object, METH_VARARGS, "push a protocol buffer object (given as a capsule) to the object store" }, - { "pull_object" , pull_object, METH_VARARGS, "pull object with a given object id from the object store" }, + { "get_objref", get_objref, METH_VARARGS, "register a new object reference with the scheduler" }, + { "request_object" , request_object, METH_VARARGS, "request an object to be delivered to the local object store" }, { "wait_for_next_task", wait_for_next_task, METH_VARARGS, "get next task from scheduler (blocking)" }, { "remote_call", remote_call, METH_VARARGS, "call a remote function" }, { "notify_task_completed", notify_task_completed, METH_VARARGS, "notify the scheduler that a task has been completed" }, diff --git a/src/scheduler.cc b/src/scheduler.cc index dcc20e2b8..aca637400 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -38,7 +38,7 @@ Status SchedulerService::PushObj(ServerContext* context, const PushObjRequest* r return Status::OK; } -Status SchedulerService::PullObj(ServerContext* context, const PullObjRequest* request, AckReply* reply) { +Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequest* request, AckReply* reply) { objtable_lock_.lock(); size_t size = objtable_.size(); objtable_lock_.unlock(); @@ -242,6 +242,9 @@ ObjRef SchedulerService::register_new_object() { void SchedulerService::add_location(ObjRef objref, ObjStoreId objstoreid) { std::lock_guard objtable_lock(objtable_lock_); + if (objref >= objtable_.size()) { + ORCH_LOG(ORCH_FATAL, "trying to put object on object store that was not registered with the scheduler"); + } // do a binary search auto pos = std::lower_bound(objtable_[objref].begin(), objtable_[objref].end(), objstoreid); if (pos == objtable_[objref].end() || objstoreid < *pos) { diff --git a/src/scheduler.h b/src/scheduler.h index afe6ccb0c..ec677e8f4 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -39,7 +39,7 @@ class SchedulerService : public Scheduler::Service { public: Status RemoteCall(ServerContext* context, const RemoteCallRequest* request, RemoteCallReply* reply) override; Status PushObj(ServerContext* context, const PushObjRequest* request, PushObjReply* reply) override; - Status PullObj(ServerContext* context, const PullObjRequest* request, AckReply* reply) override; + Status RequestObj(ServerContext* context, const RequestObjRequest* request, AckReply* reply) override; Status RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) override; Status RegisterWorker(ServerContext* context, const RegisterWorkerRequest* request, RegisterWorkerReply* reply) override; Status RegisterFunction(ServerContext* context, const RegisterFunctionRequest* request, AckReply* reply) override; diff --git a/src/serialize.cc b/src/serialize.cc new file mode 100644 index 000000000..c564eae6f --- /dev/null +++ b/src/serialize.cc @@ -0,0 +1,201 @@ +#include "serialize.h" + +using namespace arrow; + +template +struct npy_traits { +}; + +template <> +struct npy_traits { + typedef uint8_t value_type; + static const std::shared_ptr primitive_type; + using ArrayType = arrow::BooleanArray; +}; + +const std::shared_ptr npy_traits::primitive_type = std::make_shared(); + +#define NPY_INT_DECL(TYPE, CapType, T) \ + template <> \ + struct npy_traits { \ + typedef T value_type; \ + static const std::shared_ptr primitive_type; \ + using ArrayType = arrow::CapType##Array; \ + }; \ + \ + const std::shared_ptr npy_traits::primitive_type = std::make_shared(); + +NPY_INT_DECL(INT8, Int8, int8_t); +NPY_INT_DECL(INT16, Int16, int16_t); +NPY_INT_DECL(INT32, Int32, int32_t); +NPY_INT_DECL(INT64, Int64, int64_t); +NPY_INT_DECL(UINT8, UInt8, uint8_t); +NPY_INT_DECL(UINT16, UInt16, uint16_t); +NPY_INT_DECL(UINT32, UInt32, uint32_t); +NPY_INT_DECL(UINT64, UInt64, uint64_t); + +template <> +struct npy_traits { + typedef float value_type; + static const std::shared_ptr primitive_type; + using ArrayType = arrow::FloatArray; +}; + +const std::shared_ptr npy_traits::primitive_type = std::make_shared(); + +template <> +struct npy_traits { + typedef double value_type; + static const std::shared_ptr primitive_type; + using ArrayType = arrow::DoubleArray; +}; + +const std::shared_ptr npy_traits::primitive_type = std::make_shared(); + +template <> +struct npy_traits { + typedef PyObject* value_type; +}; + +template +std::shared_ptr make_flat_array(const std::string& fieldname, size_t size, std::shared_ptr data) { + auto field = std::make_shared(fieldname, npy_traits::primitive_type); + std::shared_ptr schema(new arrow::Schema({field})); + auto array = std::make_shared::ArrayType>(size, data); + return std::shared_ptr(new RowBatch(schema, size, {array})); +} + +const int64_t MAX_METADATA_SIZE = 5000; + +#define SIZE_ARROW_CASE(TYPE) \ + case TYPE: \ + return size * sizeof(npy_traits::value_type) + MAX_METADATA_SIZE; + +size_t arrow_size(PyArrayObject* array) { + npy_intp size = PyArray_SIZE(array); + switch (PyArray_TYPE(array)) { + SIZE_ARROW_CASE(NPY_INT8) + SIZE_ARROW_CASE(NPY_INT16) + SIZE_ARROW_CASE(NPY_INT32) + SIZE_ARROW_CASE(NPY_INT64) + SIZE_ARROW_CASE(NPY_UINT8) + SIZE_ARROW_CASE(NPY_UINT16) + SIZE_ARROW_CASE(NPY_UINT32) + SIZE_ARROW_CASE(NPY_UINT64) + SIZE_ARROW_CASE(NPY_FLOAT) + SIZE_ARROW_CASE(NPY_DOUBLE) + default: + ORCH_LOG(ORCH_FATAL, "serialization: numpy datatype not know"); + } +} + +#define SERIALIZE_ARROW_CASE(TYPE) \ + case TYPE: \ + { \ + data = std::make_shared(reinterpret_cast(PyArray_DATA(array)), sizeof(npy_traits::value_type) * size); \ + batch_size = size * sizeof(npy_traits::value_type) + MAX_METADATA_SIZE; \ + batch = make_flat_array("data", size, data); \ + } \ + break; + +// TODO(pcm): At the moment, this assumes that arrays are consecutive in memory +void store_arrow(PyArrayObject* array, ObjHandle& location, MemorySegmentPool* pool) { + npy_intp size = PyArray_SIZE(array); + std::shared_ptr data; + std::shared_ptr batch; + int64_t batch_size = 0; + switch (PyArray_TYPE(array)) { + SERIALIZE_ARROW_CASE(NPY_INT8) + SERIALIZE_ARROW_CASE(NPY_INT16) + SERIALIZE_ARROW_CASE(NPY_INT32) + SERIALIZE_ARROW_CASE(NPY_INT64) + SERIALIZE_ARROW_CASE(NPY_UINT8) + SERIALIZE_ARROW_CASE(NPY_UINT16) + SERIALIZE_ARROW_CASE(NPY_UINT32) + SERIALIZE_ARROW_CASE(NPY_UINT64) + SERIALIZE_ARROW_CASE(NPY_FLOAT) + SERIALIZE_ARROW_CASE(NPY_DOUBLE) + default: + ORCH_LOG(ORCH_FATAL, "serialization: numpy datatype not know"); + } + + // int64_t data_batch_size = ipc::GetRowBatchSize(batch.get()); // FIXME(pcm): once GetRowBatchSize is implemented, use it + + size_t ndim = PyArray_NDIM(array); + MemoryPool* default_pool = arrow::default_memory_pool(); + + auto metadata = std::make_shared(default_pool); + size_t metadata_size = 1 + ndim + 1; // dtype, list of shapes, pointer to header of the data segment + metadata->Resize(metadata_size * sizeof(int64_t)); + + int64_t* buffer = reinterpret_cast(metadata->mutable_data()); + buffer[0] = PyArray_TYPE(array); + // serialize the shape information + for (size_t i = 0; i < ndim; ++i) { + buffer[i+1] = PyArray_DIM(array, i); + } + std::shared_ptr metadata_batch = make_flat_array("metadata", metadata_size, metadata); + + // int64_t metadata_batch_size = ipc::GetRowBatchSize(metadata_batch.get()); // FIXME(pcm): once GetRowBatchSize is implemented, use it + + uint8_t* address = pool->get_address(location); + auto source = std::make_shared(address, location.size()); + + int64_t data_header_offset = 0; + ipc::WriteRowBatch(source.get(), batch.get(), 0, &data_header_offset); + + buffer[1 + ndim] = data_header_offset; + + int64_t metadata_header_offset = 0; + ipc::WriteRowBatch(source.get(), metadata_batch.get(), location.size() + MAX_METADATA_SIZE/2, &metadata_header_offset); + location.set_metadata_offset(metadata_header_offset); +} + +template +std::shared_ptr read_flat_array(BufferMemorySource* source, int64_t metadata_offset) { + std::shared_ptr reader; + Status s = ipc::RowBatchReader::Open(source, metadata_offset, &reader); + if (!s.ok()) { + ORCH_LOG(ORCH_FATAL, s.ToString()); + } + auto field = std::make_shared("data", npy_traits::primitive_type); + std::shared_ptr schema(new arrow::Schema({field})); + std::shared_ptr data; + reader->GetRowBatch(schema, &data); + return data->column(0); + +} + +#define DESERIALIZE_ARROW_CASE(TYPE) \ + case TYPE: \ + { \ + auto array = read_flat_array(source.get(), buffer[metadata_array->length()-1]); \ + auto data_primitive_array = dynamic_cast::ArrayType*>(array.get()); \ + return PyArray_SimpleNewFromData(dims.size(), &dims[0], TYPE, (void*)data_primitive_array->raw_data()); \ + } + +PyObject* deserialize_array(ObjHandle handle, MemorySegmentPool* pool) { + auto source = std::make_shared(pool->get_address(handle), handle.size()); + auto metadata_array = read_flat_array(source.get(), handle.metadata_offset()); + const uint64_t* buffer = dynamic_cast(metadata_array.get())->raw_data(); + uint64_t type = buffer[0]; + std::vector dims; + for (int i = 1; i < metadata_array->length()-1; ++i) { + dims.push_back(buffer[i]); + } + + switch (type) { + DESERIALIZE_ARROW_CASE(NPY_INT8) + DESERIALIZE_ARROW_CASE(NPY_INT16) + DESERIALIZE_ARROW_CASE(NPY_INT32) + DESERIALIZE_ARROW_CASE(NPY_INT64) + DESERIALIZE_ARROW_CASE(NPY_UINT8) + DESERIALIZE_ARROW_CASE(NPY_UINT16) + DESERIALIZE_ARROW_CASE(NPY_UINT32) + DESERIALIZE_ARROW_CASE(NPY_UINT64) + DESERIALIZE_ARROW_CASE(NPY_FLOAT) + DESERIALIZE_ARROW_CASE(NPY_DOUBLE) + default: + ORCH_LOG(ORCH_FATAL, "deserialization: numpy datatype not know"); + } +} diff --git a/src/serialize.h b/src/serialize.h new file mode 100644 index 000000000..efe01eb35 --- /dev/null +++ b/src/serialize.h @@ -0,0 +1,21 @@ +#ifndef ORCHESTRA_SERIALIZE_H +#define ORCHESTRA_SERIALIZE_H + +#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION + +#include +#include +#include +#include +#define NO_IMPORT_ARRAY +#define PY_ARRAY_UNIQUE_SYMBOL ORCHESTRA_ARRAY_API +#include +#include + +#include "ipc.h" + +size_t arrow_size(PyArrayObject* array); +void store_arrow(PyArrayObject* array, ObjHandle& location, MemorySegmentPool* pool); +PyObject* deserialize_array(ObjHandle handle, MemorySegmentPool* pool); + +#endif diff --git a/src/worker.cc b/src/worker.cc index 0375896e7..9b0ba87a0 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -1,4 +1,4 @@ -# include "worker.h" +#include "worker.h" Status WorkerServiceImpl::InvokeCall(ServerContext* context, const InvokeCallRequest* request, InvokeCallReply* reply) { call_ = request->call(); // Copy call @@ -36,26 +36,23 @@ void Worker::register_worker(const std::string& worker_address, const std::strin return; } -slice Worker::pull_object(ObjRef objref) { - PullObjRequest request; +void Worker::request_object(ObjRef objref) { + RequestObjRequest request; request.set_workerid(workerid_); request.set_objref(objref); AckReply reply; ClientContext context; - Status status = scheduler_stub_->PullObj(&context, request, &reply); - return get_object(objref); + Status status = scheduler_stub_->RequestObj(&context, request, &reply); + return; } -ObjRef Worker::push_object(const Obj* obj) { +ObjRef Worker::get_objref() { // first get objref for the new object PushObjRequest push_request; PushObjReply push_reply; ClientContext push_context; Status push_status = scheduler_stub_->PushObj(&push_context, push_request, &push_reply); - ObjRef objref = push_reply.objref(); - // then stream the object to the object store - put_object(objref, obj); - return objref; + return push_reply.objref(); } slice Worker::get_object(ObjRef objref) { @@ -84,12 +81,51 @@ void Worker::put_object(ObjRef objref, const Obj* obj) { request_obj_queue_.send(&request); ObjHandle result; receive_obj_queue_.receive(&result); - char* target = segmentpool_.get_address(result); + uint8_t* target = segmentpool_.get_address(result); std::memcpy(target, &data[0], data.size()); request.type = ObjRequestType::DONE; + request.metadata_offset = 0; request_obj_queue_.send(&request); } +void Worker::put_arrow(ObjRef objref, PyArrayObject* array) { + ObjRequest request; + size_t size = arrow_size(array); + request.workerid = workerid_; + request.type = ObjRequestType::ALLOC; + request.objref = objref; + request.size = size; + request_obj_queue_.send(&request); + ObjHandle result; + receive_obj_queue_.receive(&result); + store_arrow(array, result, &segmentpool_); + request.type = ObjRequestType::DONE; + request.metadata_offset = result.metadata_offset(); + request_obj_queue_.send(&request); +} + +PyArrayObject* Worker::get_arrow(ObjRef objref) { + ObjRequest request; + request.workerid = workerid_; + request.type = ObjRequestType::GET; + request.objref = objref; + request_obj_queue_.send(&request); + ObjHandle result; + receive_obj_queue_.receive(&result); + return (PyArrayObject*)deserialize_array(result, &segmentpool_); +} + +bool Worker::is_arrow(ObjRef objref) { + ObjRequest request; + request.workerid = workerid_; + request.type = ObjRequestType::GET; + request.objref = objref; + request_obj_queue_.send(&request); + ObjHandle result; + receive_obj_queue_.receive(&result); + return result.metadata_offset() != 0; +} + void Worker::register_function(const std::string& name, size_t num_return_vals) { ClientContext context; RegisterFunctionRequest request; diff --git a/src/worker.h b/src/worker.h index 7e3905275..5f5429949 100644 --- a/src/worker.h +++ b/src/worker.h @@ -16,6 +16,7 @@ using grpc::Status; #include "orchestra.grpc.pb.h" #include "orchestra/orchestra.h" #include "ipc.h" +#include "serialize.h" using grpc::Channel; using grpc::ClientContext; @@ -42,14 +43,21 @@ class Worker { RemoteCallReply remote_call(RemoteCallRequest* request); // send request to the scheduler to register this worker void register_worker(const std::string& worker_address, const std::string& objstore_address); - // push object to local object store, register it with the server and return object reference - ObjRef push_object(const Obj* obj); - // pull object from a potentially remote object store - slice pull_object(ObjRef objref); + // get a new object reference that is registered with the scheduler + ObjRef get_objref(); + // request an object to be delivered to the local object store + void request_object(ObjRef objref); // stores an object to the local object store void put_object(ObjRef objref, const Obj* obj); // retrieve serialized object from local object store slice get_object(ObjRef objref); + // stores an arrow object to the local object store + // FIXME(pcm): Once we have structs in arrow, get rid of the memcpy here + void put_arrow(ObjRef objref, PyArrayObject* array); + // gets an arrow object from the local object store + PyArrayObject* get_arrow(ObjRef objref); + // determine if the object stored in objref is an arrow object // TODO(pcm): more general mechanism for this? + bool is_arrow(ObjRef objref); // register function with scheduler void register_function(const std::string& name, size_t num_return_vals); // start the worker server which accepts tasks from the scheduler and stores