mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 03:13:07 +08:00
propagate callback error
This commit is contained in:
@@ -117,7 +117,7 @@ Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder,
|
||||
PyObject* result = PyObject_CallObject(numbuf_serialize_callback, arglist);
|
||||
if (!result) {
|
||||
Py_XDECREF(arglist);
|
||||
return python_error_to_status();
|
||||
return Status::NotImplemented("python error"); // TODO(pcm): https://github.com/pcmoritz/numbuf/issues/10
|
||||
}
|
||||
builder.AppendDict(PyDict_Size(result));
|
||||
subdicts.push_back(result);
|
||||
|
||||
@@ -11,56 +11,49 @@ extern PyObject* numbuf_deserialize_callback;
|
||||
|
||||
namespace numbuf {
|
||||
|
||||
PyObject* get_value(ArrayPtr arr, int32_t index, int32_t type, PyObject* base) {
|
||||
PyObject* result;
|
||||
Status get_value(ArrayPtr arr, int32_t index, int32_t type, PyObject* base, PyObject** result) {
|
||||
switch (arr->type()->type) {
|
||||
case Type::BOOL:
|
||||
return PyBool_FromLong(std::static_pointer_cast<BooleanArray>(arr)->Value(index));
|
||||
*result = PyBool_FromLong(std::static_pointer_cast<BooleanArray>(arr)->Value(index));
|
||||
return Status::OK();
|
||||
case Type::INT64:
|
||||
return PyInt_FromLong(std::static_pointer_cast<Int64Array>(arr)->Value(index));
|
||||
*result = PyInt_FromLong(std::static_pointer_cast<Int64Array>(arr)->Value(index));
|
||||
return Status::OK();
|
||||
case Type::BINARY: {
|
||||
int32_t nchars;
|
||||
const uint8_t* str = std::static_pointer_cast<BinaryArray>(arr)->GetValue(index, &nchars);
|
||||
return PyString_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
|
||||
*result = PyString_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
|
||||
return Status::OK();
|
||||
}
|
||||
case Type::STRING: {
|
||||
int32_t nchars;
|
||||
const uint8_t* str = std::static_pointer_cast<StringArray>(arr)->GetValue(index, &nchars);
|
||||
return PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
|
||||
*result = PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
|
||||
return Status::OK();
|
||||
}
|
||||
case Type::FLOAT:
|
||||
return PyFloat_FromDouble(std::static_pointer_cast<FloatArray>(arr)->Value(index));
|
||||
*result = PyFloat_FromDouble(std::static_pointer_cast<FloatArray>(arr)->Value(index));
|
||||
return Status::OK();
|
||||
case Type::DOUBLE:
|
||||
return PyFloat_FromDouble(std::static_pointer_cast<DoubleArray>(arr)->Value(index));
|
||||
*result = PyFloat_FromDouble(std::static_pointer_cast<DoubleArray>(arr)->Value(index));
|
||||
return Status::OK();
|
||||
case Type::STRUCT: {
|
||||
auto s = std::static_pointer_cast<StructArray>(arr);
|
||||
auto l = std::static_pointer_cast<ListArray>(s->field(0));
|
||||
if (s->type()->child(0)->name == "list") {
|
||||
ARROW_CHECK_OK(DeserializeList(l->values(), l->value_offset(index), l->value_offset(index+1), base, &result));
|
||||
return DeserializeList(l->values(), l->value_offset(index), l->value_offset(index+1), base, result);
|
||||
} else if (s->type()->child(0)->name == "tuple") {
|
||||
ARROW_CHECK_OK(DeserializeTuple(l->values(), l->value_offset(index), l->value_offset(index+1), base, &result));
|
||||
return DeserializeTuple(l->values(), l->value_offset(index), l->value_offset(index+1), base, result);
|
||||
} else if (s->type()->child(0)->name == "dict") {
|
||||
ARROW_CHECK_OK(DeserializeDict(l->values(), l->value_offset(index), l->value_offset(index+1), base, &result));
|
||||
return DeserializeDict(l->values(), l->value_offset(index), l->value_offset(index+1), base, result);
|
||||
} else {
|
||||
ARROW_CHECK_OK(DeserializeArray(arr, index, base, &result));
|
||||
return DeserializeArray(arr, index, base, result);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
default:
|
||||
DCHECK(false) << "union tag not recognized " << type;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Status python_error_to_status() {
|
||||
PyObject *type, *value, *traceback;
|
||||
PyErr_Fetch(&type, &value, &traceback);
|
||||
char *err_message = PyString_AsString(value);
|
||||
std::stringstream ss;
|
||||
if (err_message) {
|
||||
ss << "Python error in callback: " << err_message;
|
||||
}
|
||||
return Status::NotImplemented(ss.str());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status append(PyObject* elem, SequenceBuilder& builder,
|
||||
@@ -123,7 +116,7 @@ Status append(PyObject* elem, SequenceBuilder& builder,
|
||||
PyObject* result = PyObject_CallObject(numbuf_serialize_callback, arglist);
|
||||
if (!result) {
|
||||
Py_XDECREF(arglist);
|
||||
return python_error_to_status();
|
||||
return Status::NotImplemented("python error"); // TODO(pcm): https://github.com/pcmoritz/numbuf/issues/10
|
||||
}
|
||||
builder.AppendDict(PyDict_Size(result));
|
||||
subdicts.push_back(result);
|
||||
@@ -181,7 +174,9 @@ Status SerializeSequences(std::vector<PyObject*> sequences, std::shared_ptr<Arra
|
||||
int32_t offset = offsets->Value(i); \
|
||||
int8_t type = types->Value(i); \
|
||||
ArrayPtr arr = data->child(type); \
|
||||
SET_ITEM(result, i-start_idx, get_value(arr, offset, type, base)); \
|
||||
PyObject* value; \
|
||||
RETURN_NOT_OK(get_value(arr, offset, type, base, &value)); \
|
||||
SET_ITEM(result, i-start_idx, value); \
|
||||
} \
|
||||
} \
|
||||
*out = result; \
|
||||
@@ -245,7 +240,7 @@ Status DeserializeDict(std::shared_ptr<Array> array, int32_t start_idx, int32_t
|
||||
result = PyObject_CallObject(numbuf_deserialize_callback, arglist);
|
||||
if (!result) {
|
||||
Py_XDECREF(arglist);
|
||||
return python_error_to_status();
|
||||
return Status::NotImplemented("python error"); // TODO(pcm): https://github.com/pcmoritz/numbuf/issues/10
|
||||
}
|
||||
Py_XDECREF(arglist);
|
||||
}
|
||||
|
||||
@@ -17,8 +17,6 @@ arrow::Status DeserializeList(std::shared_ptr<arrow::Array> array, int32_t start
|
||||
arrow::Status DeserializeTuple(std::shared_ptr<arrow::Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out);
|
||||
arrow::Status DeserializeDict(std::shared_ptr<arrow::Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out);
|
||||
|
||||
arrow::Status python_error_to_status();
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
@@ -53,7 +53,11 @@ static PyObject* serialize_list(PyObject* self, PyObject* args) {
|
||||
if (PyList_Check(value)) {
|
||||
Status s = SerializeSequences(std::vector<PyObject*>({value}), &array);
|
||||
if (!s.ok()) {
|
||||
PyErr_SetString(NumbufError, s.ToString().c_str());
|
||||
// If this condition is true, there was an error in the callback that
|
||||
// needs to be passed through
|
||||
if (!PyErr_Occurred()) {
|
||||
PyErr_SetString(NumbufError, s.ToString().c_str());
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@@ -131,7 +135,15 @@ static PyObject* deserialize_list(PyObject* self, PyObject* args) {
|
||||
return NULL;
|
||||
}
|
||||
PyObject* result;
|
||||
ARROW_CHECK_OK(DeserializeList((*data)->column(0), 0, (*data)->num_rows(), base, &result));
|
||||
Status s = DeserializeList((*data)->column(0), 0, (*data)->num_rows(), base, &result);
|
||||
if (!s.ok()) {
|
||||
// If this condition is true, there was an error in the callback that
|
||||
// needs to be passed through
|
||||
if (!PyErr_Occurred()) {
|
||||
PyErr_SetString(NumbufError, s.ToString().c_str());
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user