Convert task_spec to flatbuffers (#255)

* convert Ray to C++

* convert task_spec to flatbuffers

* fix

* it compiles

* latest

* tests are passing

* task2 -> task

* fix

* fix

* fix

* fix

* fix

* linting

* fix valgrind

* upgrade flatbuffers

* use debug mode for valgrind

* fix naming and comments

* downgrade flatbuffers

* fix linting

* reintroduce TaskSpec_free

* rename TaskSpec -> TaskInfo

* refactoring

* linting
This commit is contained in:
Philipp Moritz
2017-03-05 02:05:02 -08:00
committed by Robert Nishihara
parent 65a8659f3d
commit 0b8d279ef2
36 changed files with 1054 additions and 931 deletions
+44 -65
View File
@@ -33,6 +33,8 @@ void init_pickle_module(void) {
CHECK(pickle_protocol != NULL);
}
TaskBuilder *g_task_builder = NULL;
/* Define the PyObjectID class. */
int PyStringToUniqueID(PyObject *object, ObjectID *object_id) {
@@ -96,14 +98,10 @@ PyObject *PyTask_from_string(PyObject *self, PyObject *args) {
}
PyTask *result = PyObject_New(PyTask, &PyTaskType);
result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType);
result->spec = (task_spec *) malloc(size);
result->size = size;
result->spec = (TaskSpec *) malloc(size);
memcpy(result->spec, data, size);
/* TODO(pcm): Better error checking once we use flatbuffers. */
if (size != task_spec_size(result->spec)) {
PyErr_SetString(CommonError,
"task_from_string: task specification string malformed");
return NULL;
}
/* TODO(pcm): Use flatbuffers validation here. */
return (PyObject *) result;
}
@@ -123,8 +121,7 @@ PyObject *PyTask_to_string(PyObject *self, PyObject *args) {
return NULL;
}
PyTask *task = (PyTask *) arg;
return PyBytes_FromStringAndSize((char *) task->spec,
task_spec_size(task->spec));
return PyBytes_FromStringAndSize((char *) task->spec, task->size);
}
static PyObject *PyObjectID_id(PyObject *self) {
@@ -271,9 +268,7 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
FunctionID function_id;
/* Arguments of the task (can be PyObjectIDs or Python values). */
PyObject *arguments;
/* Array of pointers to string representations of pass-by-value args. */
UT_array *val_repr_ptrs;
utarray_new(val_repr_ptrs, &ut_ptr_icd);
/* Number of return values of this task. */
int num_returns;
/* The ID of the task that called this task. */
TaskID parent_task_id;
@@ -289,101 +284,84 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
return -1;
}
Py_ssize_t size = PyList_Size(arguments);
/* Determine the size of pass by value data in bytes. */
Py_ssize_t value_data_bytes = 0;
for (Py_ssize_t i = 0; i < size; ++i) {
PyObject *arg = PyList_GetItem(arguments, i);
if (!PyObject_IsInstance(arg, (PyObject *) &PyObjectIDType)) {
CHECK(pickle_module != NULL);
CHECK(pickle_dumps != NULL);
PyObject *data = PyObject_CallMethodObjArgs(pickle_module, pickle_dumps,
arg, pickle_protocol, NULL);
value_data_bytes += PyBytes_Size(data);
utarray_push_back(val_repr_ptrs, &data);
}
}
/* Construct the task specification. */
int val_repr_index = 0;
self->spec = start_construct_task_spec(
driver_id, parent_task_id, parent_counter, actor_id, actor_counter,
function_id, size, num_returns, value_data_bytes);
TaskSpec_start_construct(g_task_builder, driver_id, parent_task_id,
parent_counter, actor_id, actor_counter, function_id,
num_returns);
/* Add the task arguments. */
for (Py_ssize_t i = 0; i < size; ++i) {
PyObject *arg = PyList_GetItem(arguments, i);
if (PyObject_IsInstance(arg, (PyObject *) &PyObjectIDType)) {
task_args_add_ref(self->spec, ((PyObjectID *) arg)->object_id);
TaskSpec_args_add_ref(g_task_builder, ((PyObjectID *) arg)->object_id);
} else {
/* We do this check because we cast a signed int to an unsigned int. */
CHECK(val_repr_index >= 0);
PyObject *data = *((PyObject **) utarray_eltptr(
val_repr_ptrs, (uint64_t) val_repr_index));
task_args_add_val(self->spec, (uint8_t *) PyBytes_AS_STRING(data),
PyBytes_GET_SIZE(data));
PyObject *data = PyObject_CallMethodObjArgs(pickle_module, pickle_dumps,
arg, pickle_protocol, NULL);
TaskSpec_args_add_val(g_task_builder, (uint8_t *) PyBytes_AS_STRING(data),
PyBytes_GET_SIZE(data));
Py_DECREF(data);
val_repr_index += 1;
}
}
utarray_free(val_repr_ptrs);
/* Set the resource vector of the task. */
if (resource_vector != NULL) {
CHECK(PyList_Size(resource_vector) == MAX_RESOURCE_INDEX);
for (int i = 0; i < MAX_RESOURCE_INDEX; ++i) {
CHECK(PyList_Size(resource_vector) == ResourceIndex_MAX);
for (int i = 0; i < ResourceIndex_MAX; ++i) {
PyObject *resource_entry = PyList_GetItem(resource_vector, i);
task_spec_set_required_resource(self->spec, i,
PyFloat_AsDouble(resource_entry));
TaskSpec_set_required_resource(g_task_builder, i,
PyFloat_AsDouble(resource_entry));
}
} else {
for (int i = 0; i < MAX_RESOURCE_INDEX; ++i) {
task_spec_set_required_resource(self->spec, i,
i == CPU_RESOURCE_INDEX ? 1.0 : 0.0);
for (int i = 0; i < ResourceIndex_MAX; ++i) {
TaskSpec_set_required_resource(g_task_builder, i,
i == ResourceIndex_CPU ? 1.0 : 0.0);
}
}
/* Compute the task ID and the return object IDs. */
finish_construct_task_spec(self->spec);
self->spec = TaskSpec_finish_construct(g_task_builder, &self->size);
return 0;
}
static void PyTask_dealloc(PyTask *self) {
if (self->spec != NULL) {
free_task_spec(self->spec);
TaskSpec_free(self->spec);
}
Py_TYPE(self)->tp_free((PyObject *) self);
}
static PyObject *PyTask_function_id(PyObject *self) {
FunctionID function_id = task_function(((PyTask *) self)->spec);
FunctionID function_id = TaskSpec_function(((PyTask *) self)->spec);
return PyObjectID_make(function_id);
}
static PyObject *PyTask_actor_id(PyObject *self) {
ActorID actor_id = task_spec_actor_id(((PyTask *) self)->spec);
ActorID actor_id = TaskSpec_actor_id(((PyTask *) self)->spec);
return PyObjectID_make(actor_id);
}
static PyObject *PyTask_driver_id(PyObject *self) {
UniqueID driver_id = task_spec_driver_id(((PyTask *) self)->spec);
UniqueID driver_id = TaskSpec_driver_id(((PyTask *) self)->spec);
return PyObjectID_make(driver_id);
}
static PyObject *PyTask_task_id(PyObject *self) {
TaskID task_id = task_spec_id(((PyTask *) self)->spec);
TaskID task_id = TaskSpec_task_id(((PyTask *) self)->spec);
return PyObjectID_make(task_id);
}
static PyObject *PyTask_arguments(PyObject *self) {
task_spec *task = ((PyTask *) self)->spec;
int64_t num_args = task_num_args(task);
TaskSpec *task = ((PyTask *) self)->spec;
int64_t num_args = TaskSpec_num_args(task);
PyObject *arg_list = PyList_New((Py_ssize_t) num_args);
for (int i = 0; i < num_args; ++i) {
if (task_arg_type(task, i) == ARG_BY_REF) {
ObjectID object_id = task_arg_id(task, i);
if (TaskSpec_arg_by_ref(task, i)) {
ObjectID object_id = TaskSpec_arg_id(task, i);
PyList_SetItem(arg_list, i, PyObjectID_make(object_id));
} else {
CHECK(pickle_module != NULL);
CHECK(pickle_loads != NULL);
PyObject *str =
PyBytes_FromStringAndSize((char *) task_arg_val(task, i),
(Py_ssize_t) task_arg_length(task, i));
PyBytes_FromStringAndSize((char *) TaskSpec_arg_val(task, i),
(Py_ssize_t) TaskSpec_arg_length(task, i));
PyObject *val =
PyObject_CallMethodObjArgs(pickle_module, pickle_loads, str, NULL);
Py_XDECREF(str);
@@ -394,21 +372,21 @@ static PyObject *PyTask_arguments(PyObject *self) {
}
static PyObject *PyTask_required_resources(PyObject *self) {
task_spec *task = ((PyTask *) self)->spec;
PyObject *required_resources = PyList_New((Py_ssize_t) MAX_RESOURCE_INDEX);
for (int i = 0; i < MAX_RESOURCE_INDEX; ++i) {
double r = task_spec_get_required_resource(task, i);
TaskSpec *task = ((PyTask *) self)->spec;
PyObject *required_resources = PyList_New((Py_ssize_t) ResourceIndex_MAX);
for (int i = 0; i < ResourceIndex_MAX; ++i) {
double r = TaskSpec_get_required_resource(task, i);
PyList_SetItem(required_resources, i, PyFloat_FromDouble(r));
}
return required_resources;
}
static PyObject *PyTask_returns(PyObject *self) {
task_spec *task = ((PyTask *) self)->spec;
int64_t num_returns = task_num_returns(task);
TaskSpec *task = ((PyTask *) self)->spec;
int64_t num_returns = TaskSpec_num_returns(task);
PyObject *return_id_list = PyList_New((Py_ssize_t) num_returns);
for (int i = 0; i < num_returns; ++i) {
ObjectID object_id = task_return(task, i);
ObjectID object_id = TaskSpec_return(task, i);
PyList_SetItem(return_id_list, i, PyObjectID_make(object_id));
}
return return_id_list;
@@ -474,11 +452,12 @@ PyTypeObject PyTaskType = {
};
/* Create a PyTask from a C struct. The resulting PyTask takes ownership of the
* task_spec and will deallocate the task_spec in the PyTask destructor. */
PyObject *PyTask_make(task_spec *task_spec) {
* TaskSpec and will deallocate the TaskSpec in the PyTask destructor. */
PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size) {
PyTask *result = PyObject_New(PyTask, &PyTaskType);
result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType);
result->spec = task_spec;
result->size = task_size;
return (PyObject *) result;
}