mirror of
https://github.com/wassname/ray.git
synced 2026-07-02 10:46:13 +08:00
implementing reading and writing tasks (#11)
This commit is contained in:
committed by
Robert Nishihara
parent
c238ae4aa0
commit
37f035dbd0
@@ -12,8 +12,8 @@ $(BUILD)/db_tests: hiredis test/db_tests.c thirdparty/greatest.h event_loop.c st
|
||||
$(BUILD)/io_tests: test/io_tests.c thirdparty/greatest.h io.c
|
||||
$(CC) -o $@ test/io_tests.c io.c $(CFLAGS) -I. -Ithirdparty
|
||||
|
||||
$(BUILD)/task_tests: test/task_tests.c task.c io.c common.h
|
||||
$(CC) -o $@ test/task_tests.c task.c io.c $(CFLAGS) -I. -Ithirdparty
|
||||
$(BUILD)/task_tests: test/task_tests.c task.h task.c io.h io.c common.h common.h common.c
|
||||
$(CC) -o $@ test/task_tests.c task.c io.c common.c $(CFLAGS) -I. -Ithirdparty
|
||||
|
||||
clean:
|
||||
rm -r $(BUILD)/*
|
||||
|
||||
@@ -1,5 +1,23 @@
|
||||
#include "common.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
unique_id globally_unique_id(void) {
|
||||
/* Use /dev/urandom for "real" randomness. */
|
||||
int fd;
|
||||
if ((fd = open("/dev/urandom", O_RDONLY)) == -1) {
|
||||
LOG_ERR("Could not generate random number");
|
||||
}
|
||||
unique_id result;
|
||||
read(fd, &result.id[0], UNIQUE_ID_SIZE);
|
||||
close(fd);
|
||||
return result;
|
||||
}
|
||||
|
||||
char *sha1_to_hex(const unsigned char *sha1, char *buffer) {
|
||||
static const char hex[] = "0123456789abcdef";
|
||||
char *buf = buffer;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#ifndef COMMON_H
|
||||
#define COMMON_H
|
||||
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
|
||||
#ifdef NDEBUG
|
||||
@@ -29,6 +30,9 @@
|
||||
|
||||
typedef struct { unsigned char id[UNIQUE_ID_SIZE]; } unique_id;
|
||||
|
||||
/* Generate a globally unique ID. */
|
||||
unique_id globally_unique_id(void);
|
||||
|
||||
/* Convert a 20 byte sha1 hash to a hexdecimal string. This function assumes
|
||||
* that buffer points to an already allocated char array of size 2 *
|
||||
* UNIQUE_ID_SIZE + 1 */
|
||||
|
||||
@@ -47,12 +47,16 @@ struct task_spec_impl {
|
||||
task_arg args_and_returns[0];
|
||||
};
|
||||
|
||||
/* The size of a task specification is given by the following expression. */
|
||||
#define TASK_SPEC_SIZE(NUM_ARGS, NUM_RETURNS, ARGS_VALUE_SIZE) \
|
||||
(sizeof(task_spec) + ((NUM_ARGS) + (NUM_RETURNS)) * sizeof(task_arg) + \
|
||||
(ARGS_VALUE_SIZE))
|
||||
|
||||
task_spec *alloc_task_spec(function_id func_id,
|
||||
int64_t num_args,
|
||||
int64_t num_returns,
|
||||
int64_t args_value_size) {
|
||||
int64_t size = sizeof(task_spec) +
|
||||
(num_args + num_returns) * sizeof(task_arg) + args_value_size;
|
||||
int64_t size = TASK_SPEC_SIZE(num_args, num_returns, args_value_size);
|
||||
task_spec *task = malloc(size);
|
||||
memset(task, 0, size);
|
||||
task->func_id = func_id;
|
||||
@@ -63,6 +67,11 @@ task_spec *alloc_task_spec(function_id func_id,
|
||||
return task;
|
||||
}
|
||||
|
||||
int64_t task_size(task_spec *spec) {
|
||||
return TASK_SPEC_SIZE(spec->num_args, spec->num_returns,
|
||||
spec->args_value_size);
|
||||
}
|
||||
|
||||
int64_t task_num_args(task_spec *spec) {
|
||||
return spec->num_args;
|
||||
}
|
||||
@@ -131,3 +140,16 @@ void free_task_spec(task_spec *spec) {
|
||||
CHECK(spec->arg_index == spec->num_args); /* Task was fully constructed */
|
||||
free(spec);
|
||||
}
|
||||
|
||||
void write_task(int fd, task_spec *spec) {
|
||||
write_bytes(fd, (uint8_t *) spec, task_size(spec));
|
||||
}
|
||||
|
||||
task_spec *read_task(int fd) {
|
||||
uint8_t *bytes;
|
||||
int64_t length;
|
||||
read_bytes(fd, &bytes, &length);
|
||||
task_spec *spec = (task_spec *) bytes;
|
||||
CHECK(task_size(spec) == length);
|
||||
return spec;
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
#ifndef TASK_H
|
||||
#define TASK_H
|
||||
|
||||
/* This API specifies the task data structure. It is in C so we can
|
||||
* easily construct tasks from other languages like Python. The datastructures
|
||||
* are also defined in such a way that memory is contiguous and all pointers
|
||||
@@ -24,6 +27,9 @@ task_spec *alloc_task_spec(function_id func_id,
|
||||
int64_t num_returns,
|
||||
int64_t args_value_size);
|
||||
|
||||
/* Size of the task in bytes. */
|
||||
int64_t task_size(task_spec *spec);
|
||||
|
||||
/* Getting the number of arguments and returns. */
|
||||
int64_t task_num_args(task_spec *spec);
|
||||
int64_t task_num_returns(task_spec *spec);
|
||||
@@ -46,8 +52,10 @@ unique_id *task_return(task_spec *spec, int64_t ret_index);
|
||||
void free_task_spec(task_spec *spec);
|
||||
|
||||
/* Write the task specification to a file or socket. */
|
||||
int send_task(int fd, task_spec *spec);
|
||||
void write_task(int fd, task_spec *spec);
|
||||
|
||||
/* Read the task specification from a file or socket. It is the user's
|
||||
* responsibility to free the task after it has been used. */
|
||||
task_spec *recv_task(int fd);
|
||||
task_spec *read_task(int fd);
|
||||
|
||||
#endif
|
||||
|
||||
+28
-12
@@ -1,29 +1,29 @@
|
||||
#include "greatest.h"
|
||||
|
||||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
#include "common.h"
|
||||
#include "task.h"
|
||||
|
||||
SUITE(task_tests);
|
||||
|
||||
TEST task_test(void) {
|
||||
function_id func_id = {
|
||||
{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}};
|
||||
task_spec* task = alloc_task_spec(func_id, 4, 2, 10);
|
||||
function_id func_id = globally_unique_id();
|
||||
task_spec *task = alloc_task_spec(func_id, 4, 2, 10);
|
||||
ASSERT(task_num_args(task) == 4);
|
||||
ASSERT(task_num_returns(task) == 2);
|
||||
|
||||
unique_id arg1 = {
|
||||
{2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2}};
|
||||
unique_id arg1 = globally_unique_id();
|
||||
ASSERT(task_args_add_ref(task, arg1) == 0);
|
||||
ASSERT(task_args_add_val(task, (uint8_t*) "hello", 5) == 1);
|
||||
unique_id arg2 = {
|
||||
{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3}};
|
||||
unique_id arg2 = globally_unique_id();
|
||||
ASSERT(task_args_add_ref(task, arg2) == 2);
|
||||
ASSERT(task_args_add_val(task, (uint8_t*) "world", 5) == 3);
|
||||
|
||||
unique_id ret0 = {
|
||||
{4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4}};
|
||||
unique_id ret1 = {
|
||||
{5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5}};
|
||||
unique_id ret0 = globally_unique_id();
|
||||
unique_id ret1 = globally_unique_id();
|
||||
memcpy(task_return(task, 0), &ret0, sizeof(ret0));
|
||||
memcpy(task_return(task, 1), &ret1, sizeof(ret1));
|
||||
|
||||
@@ -41,13 +41,29 @@ TEST task_test(void) {
|
||||
PASS();
|
||||
}
|
||||
|
||||
TEST send_task(void) {
|
||||
function_id func_id = globally_unique_id();
|
||||
task_spec *task = alloc_task_spec(func_id, 4, 2, 10);
|
||||
*task_return(task, 1) = globally_unique_id();
|
||||
int fd[2];
|
||||
socketpair(AF_UNIX, SOCK_STREAM, 0, fd);
|
||||
write_task(fd[0], task);
|
||||
task_spec *result = read_task(fd[1]);
|
||||
ASSERT(memcmp(task, result, task_size(task)) == 0);
|
||||
ASSERT(memcmp(task, result, task_size(result)) == 0);
|
||||
free(task);
|
||||
free(result);
|
||||
PASS();
|
||||
}
|
||||
|
||||
SUITE(task_tests) {
|
||||
RUN_TEST(task_test);
|
||||
RUN_TEST(send_task);
|
||||
}
|
||||
|
||||
GREATEST_MAIN_DEFS();
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
int main(int argc, char **argv) {
|
||||
GREATEST_MAIN_BEGIN();
|
||||
RUN_SUITE(task_tests);
|
||||
GREATEST_MAIN_END();
|
||||
|
||||
Reference in New Issue
Block a user