mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 05:58:55 +08:00
Convert to streaming sockets (#7)
* Convert to streaming sockets * Formatting
This commit is contained in:
committed by
Philipp Moritz
parent
0b7d81cae6
commit
ff11ee21ef
@@ -17,7 +17,7 @@ int bind_ipc_sock(const char *socket_pathname) {
|
||||
struct sockaddr_un socket_address;
|
||||
int socket_fd;
|
||||
|
||||
socket_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
|
||||
socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (socket_fd < 0) {
|
||||
LOG_ERR("socket() failed for pathname %s.", socket_pathname);
|
||||
return -1;
|
||||
@@ -38,6 +38,7 @@ int bind_ipc_sock(const char *socket_pathname) {
|
||||
LOG_ERR("Bind failed for pathname %s.", socket_pathname);
|
||||
return -1;
|
||||
}
|
||||
listen(socket_fd, 5);
|
||||
|
||||
return socket_fd;
|
||||
}
|
||||
@@ -49,7 +50,7 @@ int connect_ipc_sock(const char *socket_pathname) {
|
||||
struct sockaddr_un socket_address;
|
||||
int socket_fd;
|
||||
|
||||
socket_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
|
||||
socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (socket_fd < 0) {
|
||||
LOG_ERR("socket() failed for pathname %s.", socket_pathname);
|
||||
return -1;
|
||||
@@ -77,33 +78,50 @@ int connect_ipc_sock(const char *socket_pathname) {
|
||||
void send_ipc_sock(int socket_fd, char *message) {
|
||||
int length = strlen(message);
|
||||
int nbytes;
|
||||
nbytes = send(socket_fd, (char *) &length, sizeof(length), 0);
|
||||
nbytes = write(socket_fd, (char *) &length, sizeof(length));
|
||||
if (nbytes == -1) {
|
||||
fprintf(stderr, "Error sending to socket.\n");
|
||||
LOG_ERR("Error sending to socket.\n");
|
||||
return;
|
||||
}
|
||||
nbytes = send(socket_fd, (char *) message, length * sizeof(char), 0);
|
||||
nbytes = write(socket_fd, (char *) message, length * sizeof(char));
|
||||
if (nbytes == -1) {
|
||||
fprintf(stderr, "Error sending to socket.\n");
|
||||
LOG_ERR("Error sending to socket.\n");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* Accept a new client connection on the given socket
|
||||
* descriptor. Returns a descriptor for the new socket. */
|
||||
int accept_client(int socket_fd) {
|
||||
struct sockaddr_un client_addr;
|
||||
int client_fd, client_len;
|
||||
client_len = sizeof(client_addr);
|
||||
client_fd = accept(socket_fd, (struct sockaddr *) &client_addr,
|
||||
(socklen_t *) &client_len);
|
||||
if (client_fd < 0) {
|
||||
LOG_ERR("Error reading from socket.");
|
||||
return -1;
|
||||
}
|
||||
return client_fd;
|
||||
}
|
||||
|
||||
/* Receives a message on the given socket file descriptor. Allocates and
|
||||
* returns a pointer to the message.
|
||||
* NOTE: Caller must free the message! */
|
||||
char *recv_ipc_sock(int socket_fd) {
|
||||
int length;
|
||||
int nbytes;
|
||||
nbytes = recv(socket_fd, &length, sizeof(length), 0);
|
||||
if (nbytes == -1) {
|
||||
fprintf(stderr, "Error receiving from socket.\n");
|
||||
nbytes = read(socket_fd, &length, sizeof(length));
|
||||
if (nbytes < 0) {
|
||||
LOG_ERR("Error reading length of message from socket.");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
char *message = malloc((length + 1) * sizeof(char));
|
||||
nbytes = recv(socket_fd, message, length * sizeof(char), 0);
|
||||
if (nbytes == -1) {
|
||||
fprintf(stderr, "Error receiving from socket.\n");
|
||||
nbytes = read(socket_fd, message, length);
|
||||
if (nbytes < 0) {
|
||||
LOG_ERR("Error reading message from socket.");
|
||||
free(message);
|
||||
return NULL;
|
||||
}
|
||||
message[length] = '\0';
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
int bind_ipc_sock(const char* socket_pathname);
|
||||
int connect_ipc_sock(const char* socket_pathname);
|
||||
void send_ipc_sock(int socket_fd, char* message);
|
||||
int accept_client(int socket_fd);
|
||||
char* recv_ipc_sock(int socket_fd);
|
||||
|
||||
#endif
|
||||
|
||||
+8
-4
@@ -5,7 +5,7 @@
|
||||
|
||||
#include "sockets.h"
|
||||
|
||||
SUITE(event_loop_tests);
|
||||
SUITE(socket_tests);
|
||||
|
||||
TEST ipc_socket_test(void) {
|
||||
const char* socket_pathname = "test-socket";
|
||||
@@ -20,11 +20,15 @@ TEST ipc_socket_test(void) {
|
||||
ASSERT(socket_fd >= 0);
|
||||
send_ipc_sock(socket_fd, test_string);
|
||||
close(socket_fd);
|
||||
exit(0);
|
||||
} else {
|
||||
char* message = recv_ipc_sock(socket_fd);
|
||||
int client_fd = accept_client(socket_fd);
|
||||
ASSERT(client_fd >= 0);
|
||||
char* message = recv_ipc_sock(client_fd);
|
||||
ASSERT(message != NULL);
|
||||
ASSERT_STR_EQ(test_string, message);
|
||||
free(message);
|
||||
close(client_fd);
|
||||
close(socket_fd);
|
||||
unlink(socket_pathname);
|
||||
}
|
||||
@@ -32,7 +36,7 @@ TEST ipc_socket_test(void) {
|
||||
PASS();
|
||||
}
|
||||
|
||||
SUITE(event_loop_tests) {
|
||||
SUITE(socket_tests) {
|
||||
RUN_TEST(ipc_socket_test);
|
||||
}
|
||||
|
||||
@@ -40,6 +44,6 @@ GREATEST_MAIN_DEFS();
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
GREATEST_MAIN_BEGIN();
|
||||
RUN_SUITE(event_loop_tests);
|
||||
RUN_SUITE(socket_tests);
|
||||
GREATEST_MAIN_END();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user