mirror of
https://github.com/wassname/ray.git
synced 2026-07-02 16:14:06 +08:00
Make photon client into a C extension. (#8)
* Make photon client into a C extension. * Fix formatting. * Rename extension from PhotonClient to Photon. * Update common submodule. * Fix Makefile to compile with fPIC. * Update common submodule. * Compile C extension against common. * Fix formatting. * Remove unnecessary include. * Update common submodule and rename Photon -> PhotonClient. * Drop global interpretor lock during get_task.
This commit is contained in:
committed by
Philipp Moritz
parent
a048ad954a
commit
18934c3a45
@@ -1,99 +0,0 @@
|
||||
import ctypes
|
||||
import os
|
||||
|
||||
photon_client_library_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/photon_client.so")
|
||||
photon_client_library = ctypes.cdll.LoadLibrary(photon_client_library_path)
|
||||
photon_client_library.alloc_task_spec.restype = ctypes.c_void_p
|
||||
photon_client_library.photon_connect.restype = ctypes.c_void_p
|
||||
photon_client_library.photon_submit.restype = None
|
||||
photon_client_library.photon_get_task.restype = ctypes.c_void_p
|
||||
|
||||
ID = ctypes.c_ubyte * 20
|
||||
|
||||
buffer_from_read_write_memory = ctypes.pythonapi.PyBuffer_FromReadWriteMemory
|
||||
buffer_from_read_write_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64]
|
||||
buffer_from_read_write_memory.restype = ctypes.py_object
|
||||
|
||||
buffer_from_memory = ctypes.pythonapi.PyBuffer_FromMemory
|
||||
buffer_from_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64]
|
||||
buffer_from_memory.restype = ctypes.py_object
|
||||
|
||||
photon_client_library.task_function.restype = ctypes.c_void_p
|
||||
photon_client_library.task_num_args.restype = ctypes.c_int64
|
||||
photon_client_library.task_num_returns.restype = ctypes.c_int64
|
||||
photon_client_library.task_arg_type.restype = ctypes.c_int8
|
||||
photon_client_library.task_arg_id.restype = ctypes.c_void_p
|
||||
photon_client_library.task_arg_val.restype = ctypes.c_void_p
|
||||
photon_client_library.task_arg_length.restype = ctypes.c_void_p
|
||||
photon_client_library.task_return.restype = ctypes.c_void_p
|
||||
|
||||
|
||||
class TaskInfo(object):
|
||||
def __init__(self, function_id, args, return_ids):
|
||||
self.function_id = function_id
|
||||
self.args = args
|
||||
self.return_ids = return_ids
|
||||
|
||||
def extract_task(c_task):
|
||||
function_id = buffer_from_memory(photon_client_library.task_function(c_task), 20)[:]
|
||||
num_args = photon_client_library.task_num_args(c_task)
|
||||
num_returns = photon_client_library.task_num_returns(c_task)
|
||||
arg_vals_and_ids = []
|
||||
for i in range(num_args):
|
||||
arg_type = photon_client_library.task_arg_type(c_task, i)
|
||||
if arg_type == 0:
|
||||
arg_id = buffer_from_memory(photon_client_library.task_arg_id(c_task, i), 20)
|
||||
arg_vals_and_ids.append((arg_type, arg_id))
|
||||
elif arg_type == 1:
|
||||
arg_val = photon_client_library.task_arg_val(c_task, i)[:]
|
||||
arg_length = photon_client_library.task_arg_length(c_task, i)
|
||||
arg_value = buffer_from_memory(arg_val, arg_length)[:]
|
||||
arg_vals_and_ids.append((arg_type, arg_value))
|
||||
else:
|
||||
raise Exception("arg_type must be 0 or 1")
|
||||
return_ids = []
|
||||
for i in range(num_returns):
|
||||
ret_id = buffer_from_memory(photon_client_library.task_return(c_task, i), 20)
|
||||
return_ids.append(ret_id[:])
|
||||
return TaskInfo(function_id, arg_vals_and_ids, return_ids)
|
||||
|
||||
class UniqueID(ctypes.Structure):
|
||||
_fields_ = [("unique_id", ID)]
|
||||
|
||||
def make_id(string):
|
||||
if len(string) != 20:
|
||||
raise Exception("PlasmaIDs must be 20 characters long")
|
||||
unique_id = map(ord, string)
|
||||
return UniqueID(unique_id=ID(*unique_id))
|
||||
|
||||
class Task(object):
|
||||
def __init__(self, function_id, args, return_ids):
|
||||
function_id = make_id(function_id)
|
||||
self.task_spec = ctypes.c_void_p(photon_client_library.alloc_task_spec(function_id, len(args), 1, 0))
|
||||
for arg in args:
|
||||
photon_client_library.task_args_add_ref(self.task_spec, make_id(arg))
|
||||
|
||||
# Add return IDs. This may not be the appropriate place for this.
|
||||
num_returns = photon_client_library.task_num_returns(self.task_spec)
|
||||
for i in range(num_returns):
|
||||
ret_id = buffer_from_read_write_memory(photon_client_library.task_return(self.task_spec, i), 20)
|
||||
for j in range(20):
|
||||
ret_id[j] = return_ids[i][j]
|
||||
|
||||
def __del__(self):
|
||||
photon_client_library.free_task_spec(self.task_spec)
|
||||
|
||||
class PhotonClient(object):
|
||||
|
||||
def __init__(self, socket_name):
|
||||
self.photon_conn = ctypes.c_void_p(photon_client_library.photon_connect(socket_name))
|
||||
|
||||
def submit(self, function_id, args, return_ids):
|
||||
task = Task(function_id, args, return_ids)
|
||||
photon_client_library.photon_submit(self.photon_conn, task.task_spec)
|
||||
|
||||
def get_task(self):
|
||||
c_task = ctypes.c_void_p(photon_client_library.photon_get_task(self.photon_conn))
|
||||
task = c_task # TODO Extract the actual task. EXTRACT...(c_task)
|
||||
# photon_client_library.free_task_spec(c_task)
|
||||
return extract_task(task)
|
||||
@@ -0,0 +1,140 @@
|
||||
#include <Python.h>
|
||||
|
||||
#include "common_extension.h"
|
||||
#include "photon_client.h"
|
||||
#include "task.h"
|
||||
|
||||
PyObject *PhotonError;
|
||||
|
||||
// clang-format off
|
||||
typedef struct {
|
||||
PyObject_HEAD
|
||||
photon_conn *photon_connection;
|
||||
} PyPhotonClient;
|
||||
// clang-format on
|
||||
|
||||
static int PyPhotonClient_init(PyPhotonClient *self, PyObject *args,
|
||||
PyObject *kwds) {
|
||||
char *socket_name;
|
||||
if (!PyArg_ParseTuple(args, "s", &socket_name)) {
|
||||
return -1;
|
||||
}
|
||||
self->photon_connection = photon_connect(socket_name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void PyPhotonClient_dealloc(PyPhotonClient *self) {
|
||||
free(((PyPhotonClient *)self)->photon_connection);
|
||||
Py_TYPE(self)->tp_free((PyObject *)self);
|
||||
}
|
||||
|
||||
static PyObject *PyPhotonClient_submit(PyObject *self, PyObject *args) {
|
||||
PyObject *py_task;
|
||||
if (!PyArg_ParseTuple(args, "O", &py_task)) {
|
||||
return NULL;
|
||||
}
|
||||
photon_submit(((PyPhotonClient *)self)->photon_connection,
|
||||
((PyTask *)py_task)->spec);
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
// clang-format off
|
||||
static PyObject *PyPhotonClient_get_task(PyObject *self) {
|
||||
task_spec *task_spec;
|
||||
/* Drop the global interpreter lock while we get a task because
|
||||
* photon_get_task may block for a long time. */
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
task_spec = photon_get_task(((PyPhotonClient *)self)->photon_connection);
|
||||
Py_END_ALLOW_THREADS
|
||||
return PyTask_make(task_spec);
|
||||
}
|
||||
// clang-format on
|
||||
|
||||
static PyMethodDef PyPhotonClient_methods[] = {
|
||||
{"submit", (PyCFunction)PyPhotonClient_submit, METH_VARARGS,
|
||||
"Submit a task to the local scheduler."},
|
||||
{"get_task", (PyCFunction)PyPhotonClient_get_task, METH_NOARGS,
|
||||
"Get a task from the local scheduler."},
|
||||
{NULL} /* Sentinel */
|
||||
};
|
||||
|
||||
static PyTypeObject PyPhotonClientType = {
|
||||
PyObject_HEAD_INIT(NULL) 0, /* ob_size */
|
||||
"photon.PhotonClient", /* tp_name */
|
||||
sizeof(PyPhotonClient), /* tp_basicsize */
|
||||
0, /* tp_itemsize */
|
||||
(destructor)PyPhotonClient_dealloc, /* tp_dealloc */
|
||||
0, /* tp_print */
|
||||
0, /* tp_getattr */
|
||||
0, /* tp_setattr */
|
||||
0, /* tp_compare */
|
||||
0, /* tp_repr */
|
||||
0, /* tp_as_number */
|
||||
0, /* tp_as_sequence */
|
||||
0, /* tp_as_mapping */
|
||||
0, /* tp_hash */
|
||||
0, /* tp_call */
|
||||
0, /* tp_str */
|
||||
0, /* tp_getattro */
|
||||
0, /* tp_setattro */
|
||||
0, /* tp_as_buffer */
|
||||
Py_TPFLAGS_DEFAULT, /* tp_flags */
|
||||
"PhotonClient object", /* tp_doc */
|
||||
0, /* tp_traverse */
|
||||
0, /* tp_clear */
|
||||
0, /* tp_richcompare */
|
||||
0, /* tp_weaklistoffset */
|
||||
0, /* tp_iter */
|
||||
0, /* tp_iternext */
|
||||
PyPhotonClient_methods, /* tp_methods */
|
||||
0, /* tp_members */
|
||||
0, /* tp_getset */
|
||||
0, /* tp_base */
|
||||
0, /* tp_dict */
|
||||
0, /* tp_descr_get */
|
||||
0, /* tp_descr_set */
|
||||
0, /* tp_dictoffset */
|
||||
(initproc)PyPhotonClient_init, /* tp_init */
|
||||
0, /* tp_alloc */
|
||||
PyType_GenericNew, /* tp_new */
|
||||
};
|
||||
|
||||
static PyMethodDef photon_methods[] = {
|
||||
{"check_simple_value", check_simple_value, METH_VARARGS,
|
||||
"Should the object be passed by value?"},
|
||||
{NULL} /* Sentinel */
|
||||
};
|
||||
|
||||
#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */
|
||||
#define PyMODINIT_FUNC void
|
||||
#endif
|
||||
|
||||
PyMODINIT_FUNC initphoton(void) {
|
||||
PyObject *m;
|
||||
|
||||
if (PyType_Ready(&PyTaskType) < 0)
|
||||
return;
|
||||
|
||||
if (PyType_Ready(&PyObjectIDType) < 0)
|
||||
return;
|
||||
|
||||
if (PyType_Ready(&PyPhotonClientType) < 0)
|
||||
return;
|
||||
|
||||
m = Py_InitModule3("photon", photon_methods,
|
||||
"A module for the local scheduler.");
|
||||
|
||||
Py_INCREF(&PyTaskType);
|
||||
PyModule_AddObject(m, "Task", (PyObject *)&PyTaskType);
|
||||
|
||||
Py_INCREF(&PyObjectIDType);
|
||||
PyModule_AddObject(m, "ObjectID", (PyObject *)&PyObjectIDType);
|
||||
|
||||
Py_INCREF(&PyPhotonClientType);
|
||||
PyModule_AddObject(m, "PhotonClient", (PyObject *)&PyPhotonClientType);
|
||||
|
||||
char photon_error[] = "photon.error";
|
||||
PhotonError = PyErr_NewException(photon_error, NULL, NULL);
|
||||
Py_INCREF(PhotonError);
|
||||
PyModule_AddObject(m, "photon_error", PhotonError);
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
from setuptools import setup, find_packages, Extension
|
||||
|
||||
photon_module = Extension("photon",
|
||||
sources=["photon_extension.c", "../../common/lib/python/common_extension.c"],
|
||||
include_dirs=["../../", "../../common/",
|
||||
"../../common/thirdparty/",
|
||||
"../../common/lib/python"],
|
||||
extra_objects=["../../build/photon_client.a", "../../common/build/libcommon.a"],
|
||||
extra_compile_args=["--std=c99", "-Werror"])
|
||||
|
||||
setup(name="Photon",
|
||||
version="0.1",
|
||||
description="Photon library for Ray",
|
||||
ext_modules=[photon_module])
|
||||
Reference in New Issue
Block a user