diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000000000..3f2c8add4 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "common"] + path = common + url = https://github.com/ray-project/common diff --git a/.travis.yml b/.travis.yml index fd43573ca..45ef9a286 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,5 +31,11 @@ matrix: - .travis/check-git-clang-format-output.sh install: + - make script: + - cd common + - make test + - cd .. + - source setup-env.sh + - python test/test.py diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..7c494a74e --- /dev/null +++ b/Makefile @@ -0,0 +1,21 @@ +CC = gcc +CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -Icommon/thirdparty +BUILD = build + +all: $(BUILD)/photon_scheduler $(BUILD)/photon_client.so + +$(BUILD)/photon_client.so: photon_client.h photon_client.c common + $(CC) $(CFLAGS) photon_client.c common/build/libcommon.a -fPIC -shared -o $(BUILD)/photon_client.so + +$(BUILD)/photon_scheduler: photon.h photon.c common + $(CC) $(CFLAGS) -o $@ photon.c common/build/libcommon.a common/thirdparty/hiredis/libhiredis.a -Icommon/thirdparty -Icommon/ + +common: FORCE + git submodule update --init --recursive + cd common; make + +clean: + cd common; make clean + rm -r $(BUILD)/* + +FORCE: diff --git a/build/.gitkeep b/build/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/common b/common new file mode 160000 index 000000000..084220b0e --- /dev/null +++ b/common @@ -0,0 +1 @@ +Subproject commit 084220b0e70de6bed466e97e08f4b6909133aafb diff --git a/lib/python/photon.py b/lib/python/photon.py new file mode 100644 index 000000000..b0fc263a5 --- /dev/null +++ b/lib/python/photon.py @@ -0,0 +1,38 @@ +import ctypes +import os + +photon_client_library_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/photon_client.so") +photon_client_library = ctypes.cdll.LoadLibrary(photon_client_library_path) +photon_client_library.alloc_task_spec.restype = ctypes.c_void_p +photon_client_library.photon_connect.restype = ctypes.c_void_p +photon_client_library.photon_submit.restype = None + +ID = ctypes.c_ubyte * 20 + +class UniqueID(ctypes.Structure): + _fields_ = [("unique_id", ID)] + +def make_id(string): + if len(string) != 20: + raise Exception("PlasmaIDs must be 20 characters long") + unique_id = map(ord, string) + return UniqueID(unique_id=ID(*unique_id)) + +class Task(object): + def __init__(self, function_id, args): + function_id = make_id(function_id) + self.task_spec = ctypes.c_void_p(photon_client_library.alloc_task_spec(function_id, len(args), 1, 0)) + for arg in args: + photon_client_library.task_args_add_ref(self.task_spec, arg) + + def __del__(self): + photon_client_library.free_task_spec(self.task_spec) + +class PhotonClient(object): + + def __init__(self, socket_name): + self.photon_conn = ctypes.c_void_p(photon_client_library.photon_connect(socket_name)) + + def submit(self, function_id, args): + task = Task(function_id, args) + photon_client_library.photon_submit(self.photon_conn, task.task_spec) diff --git a/photon.c b/photon.c new file mode 100644 index 000000000..8c84c2b95 --- /dev/null +++ b/photon.c @@ -0,0 +1,109 @@ +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "event_loop.h" +#include "io.h" +#include "photon.h" +#include "state/db.h" +#include "state/task_queue.h" +#include "task.h" +#include "utarray.h" + +typedef struct { + db_handle *db; + UT_array *task_queue; +} local_scheduler_state; + +event_loop *init_local_scheduler() { return event_loop_create(); }; + +void process_message(event_loop *loop, int client_sock, void *context, + int events) { + local_scheduler_state *s = context; + + uint8_t *message; + int64_t type; + int64_t length; + read_message(client_sock, &type, &length, &message); + + switch (type) { + case SUBMIT_TASK: { + task_spec *task = (task_spec *)message; + CHECK(task_size(task) == length); + unique_id id = globally_unique_id(); + task_queue_submit_task(s->db, id, task); + } break; + case TASK_DONE: { + } break; + case DISCONNECT_CLIENT: { + LOG_INFO("Disconnecting client on fd %d", client_sock); + event_loop_remove_file(loop, client_sock); + } break; + case LOG_MESSAGE: { + } break; + default: + /* This code should be unreachable. */ + CHECK(0); + } + free(message); +} + +void new_client_connection(event_loop *loop, int listener_sock, void *context, + int events) { + local_scheduler_state *s = context; + int new_socket = accept_client(listener_sock); + event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, s); + LOG_INFO("new connection with fd %d", new_socket); +} + +void start_server(const char *socket_name, const char *redis_addr, + int redis_port) { + int fd = bind_ipc_sock(socket_name); + local_scheduler_state state; + event_loop *loop = init_local_scheduler(); + + state.db = db_connect(redis_addr, redis_port, "photon", "", -1); + db_attach(state.db, loop); + + /* Run event loop. */ + event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection, &state); + event_loop_run(loop); +} + +int main(int argc, char *argv[]) { + /* Path of the listening socket of the local scheduler. */ + char *scheduler_socket_name = NULL; + /* IP address and port of redis. */ + char *redis_addr_port = NULL; + int c; + while ((c = getopt(argc, argv, "s:r:")) != -1) { + switch (c) { + case 's': + scheduler_socket_name = optarg; + break; + case 'r': + redis_addr_port = optarg; + break; + default: + LOG_ERR("unknown option %c", c); + exit(-1); + } + } + if (!scheduler_socket_name) { + LOG_ERR("please specify socket for incoming connections with -s switch"); + exit(-1); + } + char redis_addr[16] = {0}; + char redis_port[6] = {0}; + if (!redis_addr_port || + sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port) != + 2) { + LOG_ERR("need to specify redis address like 127.0.0.1:6379 with -r switch"); + exit(-1); + } + start_server(scheduler_socket_name, &redis_addr[0], atoi(redis_port)); +} diff --git a/photon.h b/photon.h new file mode 100644 index 000000000..6a213c4a5 --- /dev/null +++ b/photon.h @@ -0,0 +1,14 @@ +#ifndef PHOTON_H +#define PHOTON_H + +enum photon_message_type { + /** Notify the local scheduler that a task has finished. */ + TASK_DONE = 64, +}; + +struct photon_conn_impl { + /* File descriptor of the Unix domain socket that connects to photon. */ + int conn; +}; + +#endif diff --git a/photon_client.c b/photon_client.c new file mode 100644 index 000000000..a33b25631 --- /dev/null +++ b/photon_client.c @@ -0,0 +1,27 @@ +#include "photon_client.h" + +#include "common/io.h" +#include "common/task.h" +#include + +photon_conn *photon_connect(const char *photon_socket) { + photon_conn *result = malloc(sizeof(photon_conn)); + result->conn = connect_ipc_sock(photon_socket); + return result; +} + +void photon_submit(photon_conn *conn, task_spec *task) { + write_message(conn->conn, SUBMIT_TASK, task_size(task), (uint8_t *)task); +} + +void photon_task_done(photon_conn *conn) { + write_message(conn->conn, TASK_DONE, 0, NULL); +} + +void photon_disconnect(photon_conn *conn) { + write_message(conn->conn, DISCONNECT_CLIENT, 0, NULL); +} + +void photon_log_message(photon_conn *conn) { + write_message(conn->conn, LOG_MESSAGE, 0, NULL); +} diff --git a/photon_client.h b/photon_client.h new file mode 100644 index 000000000..3163d8b7d --- /dev/null +++ b/photon_client.h @@ -0,0 +1,27 @@ +#ifndef PHOTON_CLIENT_H +#define PHOTON_CLIENT_H + +#include "common/task.h" +#include "photon.h" + +typedef struct photon_conn_impl photon_conn; + +/* Connect to the local scheduler. */ +photon_conn *photon_connect(const char *photon_socket); + +/* Submit a task to the local scheduler. */ +void photon_submit(photon_conn *conn, task_spec *task); + +/* Get next task for this client. */ +task_spec *photon_get_task(photon_conn *conn); + +/* Tell the local scheduler that the client has finished executing a task. */ +void photon_task_done(photon_conn *conn); + +/* Disconnect from the local scheduler. */ +void photon_disconnect(photon_conn *conn); + +/* Send a log message to the local scheduler. */ +void photon_log_message(photon_conn *conn); + +#endif diff --git a/photon_scheduler.h b/photon_scheduler.h new file mode 100644 index 000000000..cce91155d --- /dev/null +++ b/photon_scheduler.h @@ -0,0 +1,15 @@ +#ifndef PHOTON_SCHEDULER +#define PHOTON_SCHEDULER + +/* Establish a connection to a new client. */ +void new_client_connection(local_scheduler_state *s, int listener_sock); + +/* schedule a task on a given worker. */ +void schedule_on_worker(local_scheduler_state *s, task_spec *task, + int client_id); + +/* Handle new incoming task that was scheduled by the globl scheduler on + * this local scheduler. */ +void schedule_task(local_scheduler_state *s, task_spec *task) + +#endif diff --git a/setup-env.sh b/setup-env.sh new file mode 100755 index 000000000..7c4350150 --- /dev/null +++ b/setup-env.sh @@ -0,0 +1,5 @@ +echo "Adding Photon to PYTHONPATH" 1>&2 + +ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) + +export PYTHONPATH="$ROOT_DIR/lib/python/:$PYTHONPATH" diff --git a/test/test.py b/test/test.py new file mode 100644 index 000000000..2307f2276 --- /dev/null +++ b/test/test.py @@ -0,0 +1,37 @@ +from __future__ import print_function + +import os +import subprocess +import sys +import unittest +import random +import time + +import photon + +class TestPhotonClient(unittest.TestCase): + + def setUp(self): + # Start Redis. + redis_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../common/thirdparty/redis-3.2.3/src/redis-server") + self.p1 = subprocess.Popen([redis_executable, "--loglevel", "warning"]) + time.sleep(0.1) + scheduler_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/photon_scheduler") + scheduler_name = "/tmp/scheduler{}".format(random.randint(0, 10000)) + self.p2 = subprocess.Popen([scheduler_executable, "-s", scheduler_name, "-r", "127.0.0.1:6379"]) + time.sleep(0.1) + # Connect to the scheduler. + self.photon_client = photon.PhotonClient(scheduler_name) + + def tearDown(self): + # Kill the Redis server. + self.p1.kill() + # Kill the local scheduler. + self.p2.kill() + + def test_create(self): + l = [photon.make_id(20 * "a"), photon.make_id(20 * "b"), photon.make_id(20 * "c")] + self.photon_client.submit(20 * "a", l) + +if __name__ == "__main__": + unittest.main(verbosity=2)