diff --git a/src/common/io.c b/src/common/io.c index d90c987a8..a857bfa03 100644 --- a/src/common/io.c +++ b/src/common/io.c @@ -90,7 +90,7 @@ int bind_ipc_sock(const char *socket_pathname, bool shall_listen) { } unlink(socket_pathname); - memset(&socket_address, 0, sizeof(struct sockaddr_un)); + memset(&socket_address, 0, sizeof(socket_address)); socket_address.sun_family = AF_UNIX; if (strlen(socket_pathname) + 1 > sizeof(socket_address.sun_path)) { LOG_ERROR("Socket pathname is too long."); @@ -101,7 +101,7 @@ int bind_ipc_sock(const char *socket_pathname, bool shall_listen) { strlen(socket_pathname) + 1); if (bind(socket_fd, (struct sockaddr *) &socket_address, - sizeof(struct sockaddr_un)) != 0) { + sizeof(socket_address)) != 0) { LOG_ERROR("Bind failed for pathname %s.", socket_pathname); close(socket_fd); return -1; @@ -129,7 +129,7 @@ int connect_ipc_sock(const char *socket_pathname) { return -1; } - memset(&socket_address, 0, sizeof(struct sockaddr_un)); + memset(&socket_address, 0, sizeof(socket_address)); socket_address.sun_family = AF_UNIX; if (strlen(socket_pathname) + 1 > sizeof(socket_address.sun_path)) { LOG_ERROR("Socket pathname is too long."); @@ -139,7 +139,7 @@ int connect_ipc_sock(const char *socket_pathname) { strlen(socket_pathname) + 1); if (connect(socket_fd, (struct sockaddr *) &socket_address, - sizeof(struct sockaddr_un)) != 0) { + sizeof(socket_address)) != 0) { LOG_ERROR("Connection to socket failed for pathname %s.", socket_pathname); return -1; } @@ -278,11 +278,11 @@ int read_bytes(int fd, uint8_t *cursor, size_t length) { * @return Void. */ void read_message(int fd, int64_t *type, int64_t *length, uint8_t **bytes) { - int closed = read_bytes(fd, (uint8_t *) type, sizeof(int64_t)); + int closed = read_bytes(fd, (uint8_t *) type, sizeof(*type)); if (closed) { goto disconnected; } - closed = read_bytes(fd, (uint8_t *) length, sizeof(int64_t)); + closed = read_bytes(fd, (uint8_t *) length, sizeof(*length)); if (closed) { goto disconnected; } @@ -321,11 +321,11 @@ disconnected: */ int64_t read_buffer(int fd, int64_t *type, UT_array *buffer) { int64_t length; - int closed = read_bytes(fd, (uint8_t *) type, sizeof(int64_t)); + int closed = read_bytes(fd, (uint8_t *) type, sizeof(*type)); if (closed) { goto disconnected; } - closed = read_bytes(fd, (uint8_t *) &length, sizeof(int64_t)); + closed = read_bytes(fd, (uint8_t *) &length, sizeof(length)); if (closed) { goto disconnected; } diff --git a/src/common/lib/python/common_extension.c b/src/common/lib/python/common_extension.c index 23c2058c3..6672bfa38 100644 --- a/src/common/lib/python/common_extension.c +++ b/src/common/lib/python/common_extension.c @@ -33,7 +33,7 @@ static int PyObjectID_init(PyObjectID *self, PyObject *args, PyObject *kwds) { "ObjectID: object id string needs to have length 20"); return -1; } - memcpy(&self->object_id.id[0], data, sizeof(object_id)); + memcpy(&self->object_id.id[0], data, sizeof(self->object_id.id)); return 0; } @@ -48,7 +48,7 @@ PyObject *PyObjectID_make(object_id object_id) { static PyObject *PyObjectID_id(PyObject *self) { PyObjectID *s = (PyObjectID *) self; return PyString_FromStringAndSize((char *) &s->object_id.id[0], - sizeof(object_id)); + sizeof(s->object_id.id)); } static PyObject *PyObjectID_richcompare(PyObjectID *self, diff --git a/src/common/logging.c b/src/common/logging.c index 100b2777d..010cd5682 100644 --- a/src/common/logging.c +++ b/src/common/logging.c @@ -65,8 +65,12 @@ void ray_log(ray_logger *logger, if (logger->is_direct) { db_handle *db = (db_handle *) logger->conn; /* Fill in the client ID and send the message to Redis. */ - redisAsyncCommand(db->context, NULL, NULL, utstring_body(formatted_message), - (char *) db->client.id, sizeof(db_client_id)); + int status = redisAsyncCommand( + db->context, NULL, NULL, utstring_body(formatted_message), + (char *) db->client.id, sizeof(db->client.id)); + if ((status == REDIS_ERR) || db->context->err) { + LOG_REDIS_DEBUG(db->context, "error while logging message to log table"); + } } else { /* If we don't own a Redis connection, we leave our client * ID to be filled in by someone else. */ diff --git a/src/common/state/object_table.c b/src/common/state/object_table.c index 0a894a460..48a9db0ff 100644 --- a/src/common/state/object_table.c +++ b/src/common/state/object_table.c @@ -45,7 +45,7 @@ void result_table_add(db_handle *db_handle, result_table_done_callback done_callback, void *user_context) { task_id *task_id_copy = malloc(sizeof(task_id)); - memcpy(task_id_copy, task_id_arg.id, sizeof(task_id)); + memcpy(task_id_copy, task_id_arg.id, sizeof(*task_id_copy)); init_table_callback(db_handle, object_id, __func__, task_id_copy, retry, done_callback, redis_result_table_add, user_context); } diff --git a/src/common/state/redis.c b/src/common/state/redis.c index 4ea8753e4..f0e4ae30c 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -17,12 +17,6 @@ #include "redis.h" #include "io.h" -#define LOG_REDIS_ERROR(context, M, ...) \ - LOG_ERROR("Redis error %d %s; %s", context->err, context->errstr, M) - -#define LOG_REDIS_DEBUG(context, M, ...) \ - LOG_DEBUG("Redis error %d %s; %s", context->err, context->errstr, M) - #define CHECK_REDIS_CONNECT(CONTEXT_TYPE, context, M, ...) \ do { \ CONTEXT_TYPE *_context = (context); \ @@ -33,7 +27,7 @@ LOG_REDIS_ERROR(_context, M, ##__VA_ARGS__); \ exit(-1); \ } \ - } while (0); + } while (0) #define REDIS_CALLBACK_HEADER(DB, CB_DATA, REPLY) \ if ((REPLY) == NULL) { \ @@ -45,7 +39,9 @@ if (CB_DATA == NULL) \ /* the callback data structure has been \ * already freed; just ignore this reply */ \ - return; + return; \ + do { \ + } while (0) db_handle *db_connect(const char *address, int port, @@ -60,7 +56,7 @@ db_handle *db_connect(const char *address, address, port); /* Add new client using optimistic locking. */ db_client_id client = globally_unique_id(); - while (1) { + while (true) { reply = redisCommand(context, "WATCH %s", client_type); freeReplyObject(reply); reply = redisCommand(context, "HLEN %s", client_type); @@ -70,11 +66,11 @@ db_handle *db_connect(const char *address, reply = redisCommand( context, "HMSET db_clients:%b client_type %s address %s:%d db_client_id %b", - (char *) client.id, sizeof(db_client_id), client_type, client_addr, - client_port, (char *) client.id, sizeof(db_client_id)); + (char *) client.id, sizeof(client.id), client_type, client_addr, + client_port, (char *) client.id, sizeof(client.id)); freeReplyObject(reply); reply = redisCommand(context, "PUBLISH db_clients %b:%s", - (char *) client.id, sizeof(db_client_id), client_type); + (char *) client.id, sizeof(client.id), client_type); freeReplyObject(reply); reply = redisCommand(context, "EXEC"); CHECK(reply); @@ -193,7 +189,7 @@ task *parse_redis_task_table_entry(task_id id, void redis_object_table_add_callback(redisAsyncContext *c, void *r, void *privdata) { - REDIS_CALLBACK_HEADER(db, callback_data, r) + REDIS_CALLBACK_HEADER(db, callback_data, r); if (callback_data->done_callback) { task_table_done_callback done_callback = callback_data->done_callback; @@ -208,8 +204,8 @@ void redis_object_table_add(table_callback_data *callback_data) { object_id id = callback_data->id; int status = redisAsyncCommand(db->context, redis_object_table_add_callback, (void *) callback_data->timer_id, - "SADD obj:%b %b", id.id, sizeof(object_id), - (char *) db->client.id, sizeof(db_client_id)); + "SADD obj:%b %b", id.id, sizeof(id.id), + (char *) db->client.id, sizeof(db->client.id)); if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_DEBUG(db->context, "could not add object_table entry"); @@ -224,7 +220,7 @@ void redis_object_table_lookup(table_callback_data *callback_data) { object_id id = callback_data->id; int status = redisAsyncCommand(db->context, redis_object_table_get_entry, (void *) callback_data->timer_id, - "SMEMBERS obj:%b", id.id, sizeof(object_id)); + "SMEMBERS obj:%b", id.id, sizeof(id.id)); if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_DEBUG(db->context, "error in object_table lookup"); } @@ -233,7 +229,7 @@ void redis_object_table_lookup(table_callback_data *callback_data) { void redis_result_table_add_callback(redisAsyncContext *c, void *r, void *privdata) { - REDIS_CALLBACK_HEADER(db, callback_data, r) + REDIS_CALLBACK_HEADER(db, callback_data, r); redisReply *reply = r; CHECK(reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_INTEGER); @@ -252,10 +248,10 @@ void redis_result_table_add(table_callback_data *callback_data) { object_id id = callback_data->id; task_id *result_task_id = (task_id *) callback_data->data; /* Add the result entry to the result table. */ - int status = redisAsyncCommand(db->context, redis_result_table_add_callback, - (void *) callback_data->timer_id, - "SET result:%b %b", id.id, sizeof(object_id), - (*result_task_id).id, sizeof(task_id)); + int status = redisAsyncCommand( + db->context, redis_result_table_add_callback, + (void *) callback_data->timer_id, "SET result:%b %b", id.id, + sizeof(id.id), result_task_id->id, sizeof(result_task_id->id)); if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_DEBUG(db->context, "Error in result table add"); } @@ -264,7 +260,7 @@ void redis_result_table_add(table_callback_data *callback_data) { void redis_result_table_lookup_task_callback(redisAsyncContext *c, void *r, void *privdata) { - REDIS_CALLBACK_HEADER(db, callback_data, r) + REDIS_CALLBACK_HEADER(db, callback_data, r); redisReply *reply = r; /* Check that we received a Redis hashmap. */ if (reply->type != REDIS_REPLY_ARRAY) { @@ -288,7 +284,7 @@ void redis_result_table_lookup_task_callback(redisAsyncContext *c, void redis_result_table_lookup_object_callback(redisAsyncContext *c, void *r, void *privdata) { - REDIS_CALLBACK_HEADER(db, callback_data, r) + REDIS_CALLBACK_HEADER(db, callback_data, r); redisReply *reply = r; if (reply->type == REDIS_REPLY_STRING) { @@ -300,7 +296,7 @@ void redis_result_table_lookup_object_callback(redisAsyncContext *c, int status = redisAsyncCommand(db->context, redis_result_table_lookup_task_callback, (void *) callback_data->timer_id, "HGETALL task:%b", - (*result_task_id).id, sizeof(task_id)); + result_task_id->id, sizeof(result_task_id->id)); if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_DEBUG(db->context, "Could not look up result table entry"); } @@ -323,10 +319,9 @@ void redis_result_table_lookup(table_callback_data *callback_data) { db_handle *db = callback_data->db_handle; /* First, lookup the ID of the task that created this object. */ object_id id = callback_data->id; - int status = - redisAsyncCommand(db->context, redis_result_table_lookup_object_callback, - (void *) callback_data->timer_id, "GET result:%b", - id.id, sizeof(object_id)); + int status = redisAsyncCommand( + db->context, redis_result_table_lookup_object_callback, + (void *) callback_data->timer_id, "GET result:%b", id.id, sizeof(id.id)); if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_DEBUG(db->context, "Error in result table lookup"); } @@ -350,7 +345,7 @@ void redis_get_cached_db_client(db_handle *db, /* This is a very rare case. It should happen at most once per db client. */ redisReply *reply = redisCommand(db->sync_context, "HGET db_clients:%b address", - (char *) db_client_id.id, sizeof(db_client_id)); + (char *) db_client_id.id, sizeof(db_client_id.id)); CHECK(reply->type == REDIS_REPLY_STRING); entry = malloc(sizeof(db_client_cache_entry)); entry->db_client_id = db_client_id; @@ -365,7 +360,7 @@ void redis_get_cached_db_client(db_handle *db, void redis_object_table_get_entry(redisAsyncContext *c, void *r, void *privdata) { - REDIS_CALLBACK_HEADER(db, callback_data, r) + REDIS_CALLBACK_HEADER(db, callback_data, r); redisReply *reply = r; db_client_id *managers = malloc(reply->elements * sizeof(db_client_id)); @@ -375,7 +370,7 @@ void redis_object_table_get_entry(redisAsyncContext *c, const char **manager_vector = malloc(manager_count * sizeof(char *)); for (int j = 0; j < reply->elements; ++j) { CHECK(reply->element[j]->type == REDIS_REPLY_STRING); - memcpy(managers[j].id, reply->element[j]->str, sizeof(db_client_id)); + memcpy(managers[j].id, reply->element[j]->str, sizeof(managers[j].id)); redis_get_cached_db_client(db, managers[j], manager_vector + j); } @@ -394,7 +389,7 @@ void redis_object_table_get_entry(redisAsyncContext *c, void object_table_redis_callback(redisAsyncContext *c, void *r, void *privdata) { - REDIS_CALLBACK_HEADER(db, callback_data, r) + REDIS_CALLBACK_HEADER(db, callback_data, r); redisReply *reply = r; CHECK(reply->type == REDIS_REPLY_ARRAY); @@ -426,7 +421,7 @@ void redis_object_table_subscribe(table_callback_data *callback_data) { int status = redisAsyncCommand(db->sub_context, object_table_redis_callback, (void *) callback_data->timer_id, "SUBSCRIBE __keyspace@0__:%b add", id.id, - sizeof(object_id)); + sizeof(id.id)); if ((status == REDIS_ERR) || db->sub_context->err) { LOG_REDIS_DEBUG(db->sub_context, "error in redis_object_table_subscribe_callback"); @@ -440,7 +435,7 @@ void redis_object_table_subscribe(table_callback_data *callback_data) { void redis_task_table_get_task_callback(redisAsyncContext *c, void *r, void *privdata) { - REDIS_CALLBACK_HEADER(db, callback_data, r) + REDIS_CALLBACK_HEADER(db, callback_data, r); redisReply *reply = r; /* Check that we received a Redis hashmap. */ if (reply->type != REDIS_REPLY_ARRAY) { @@ -466,9 +461,9 @@ void redis_task_table_get_task(table_callback_data *callback_data) { int status = redisAsyncCommand(db->context, redis_task_table_get_task_callback, (void *) callback_data->timer_id, "HGETALL task:%b", - id.id, sizeof(task_id)); - if ((status == REDIS_ERR) || db->sub_context->err) { - LOG_REDIS_DEBUG(db->sub_context, "Could not get task from task table"); + id.id, sizeof(id.id)); + if ((status == REDIS_ERR) || db->context->err) { + LOG_REDIS_DEBUG(db->context, "Could not get task from task table"); } } @@ -510,14 +505,14 @@ void redis_task_table_publish(table_callback_data *callback_data, status = redisAsyncCommand( db->context, redis_task_table_publish_push_callback, (void *) callback_data->timer_id, "HMSET task:%b state %d node %b", - (char *) id.id, sizeof(task_id), state, (char *) node.id, - sizeof(node_id)); + (char *) id.id, sizeof(id.id), state, (char *) node.id, + sizeof(node.id)); } else { status = redisAsyncCommand( db->context, redis_task_table_publish_push_callback, (void *) callback_data->timer_id, "HMSET task:%b state %d node %b task_spec %b", (char *) id.id, - sizeof(task_id), state, (char *) node.id, sizeof(node_id), + sizeof(id.id), state, (char *) node.id, sizeof(node.id), (char *) spec, task_spec_size(spec)); } if ((status = REDIS_ERR) || db->context->err) { @@ -529,7 +524,7 @@ void redis_task_table_publish(table_callback_data *callback_data, int status = redisAsyncCommand( db->context, redis_task_table_publish_publish_callback, (void *) callback_data->timer_id, "PUBLISH task:%b:%d %b", - (char *) node.id, sizeof(node_id), state, (char *) task, + (char *) node.id, sizeof(node.id), state, (char *) task, task_size(task)); if ((status == REDIS_ERR) || db->context->err) { @@ -551,7 +546,7 @@ void redis_task_table_publish_push_callback(redisAsyncContext *c, void *r, void *privdata) { LOG_DEBUG("Calling publish push callback"); - REDIS_CALLBACK_HEADER(db, callback_data, r) + REDIS_CALLBACK_HEADER(db, callback_data, r); CHECK(callback_data->requests_info != NULL); ((bool *) callback_data->requests_info)[PUSH_INDEX] = true; @@ -568,7 +563,7 @@ void redis_task_table_publish_publish_callback(redisAsyncContext *c, void *r, void *privdata) { LOG_DEBUG("Calling publish publish callback"); - REDIS_CALLBACK_HEADER(db, callback_data, r) + REDIS_CALLBACK_HEADER(db, callback_data, r); CHECK(callback_data->requests_info != NULL); ((bool *) callback_data->requests_info)[PUBLISH_INDEX] = true; @@ -584,7 +579,7 @@ void redis_task_table_publish_publish_callback(redisAsyncContext *c, void redis_task_table_subscribe_callback(redisAsyncContext *c, void *r, void *privdata) { - REDIS_CALLBACK_HEADER(db, callback_data, r) + REDIS_CALLBACK_HEADER(db, callback_data, r); redisReply *reply = r; CHECK(reply->type == REDIS_REPLY_ARRAY); @@ -631,7 +626,7 @@ void redis_task_table_subscribe(table_callback_data *callback_data) { status = redisAsyncCommand( db->sub_context, redis_task_table_subscribe_callback, (void *) callback_data->timer_id, "SUBSCRIBE task:%b:%d", - (char *) node.id, sizeof(node_id), data->state_filter); + (char *) node.id, sizeof(node.id), data->state_filter); } if ((status == REDIS_ERR) || db->sub_context->err) { LOG_REDIS_DEBUG(db->sub_context, "error in task_table_register_callback"); @@ -645,7 +640,7 @@ void redis_task_table_subscribe(table_callback_data *callback_data) { void redis_db_client_table_subscribe_callback(redisAsyncContext *c, void *r, void *privdata) { - REDIS_CALLBACK_HEADER(db, callback_data, r) + REDIS_CALLBACK_HEADER(db, callback_data, r); redisReply *reply = r; CHECK(reply->type == REDIS_REPLY_ARRAY); @@ -669,13 +664,12 @@ void redis_db_client_table_subscribe_callback(redisAsyncContext *c, /* Otherwise, parse the payload and call the callback. */ db_client_table_subscribe_data *data = callback_data->data; db_client_id client; - memcpy(client.id, payload->str, sizeof(db_client_id)); - /* We subtract 1 + sizeof(db_client_id) to compute the length of the + memcpy(client.id, payload->str, sizeof(client.id)); + /* We subtract 1 + sizeof(client.id) to compute the length of the * client_type string, and we add 1 to null-terminate the string. */ - int client_type_length = payload->len - 1 - sizeof(db_client_id) + 1; + int client_type_length = payload->len - 1 - sizeof(client.id) + 1; char *client_type = malloc(client_type_length); - memcpy(client_type, &payload->str[1 + sizeof(db_client_id)], - client_type_length); + memcpy(client_type, &payload->str[1 + sizeof(client.id)], client_type_length); if (data->subscribe_callback) { data->subscribe_callback(client, client_type, data->subscribe_context); } diff --git a/src/common/state/redis.h b/src/common/state/redis.h index 0b4742df4..ee01ddc86 100644 --- a/src/common/state/redis.h +++ b/src/common/state/redis.h @@ -10,6 +10,12 @@ #include "uthash.h" #include "utarray.h" +#define LOG_REDIS_ERROR(context, M, ...) \ + LOG_ERROR("Redis error %d %s; %s", context->err, context->errstr, M) + +#define LOG_REDIS_DEBUG(context, M, ...) \ + LOG_DEBUG("Redis error %d %s; %s", context->err, context->errstr, M) + typedef struct { /** Unique ID for this db client. */ db_client_id db_client_id; diff --git a/src/common/task.c b/src/common/task.c index d7e8102dc..aed51c906 100644 --- a/src/common/task.c +++ b/src/common/task.c @@ -108,7 +108,7 @@ task_id compute_task_id(task_spec *spec) { /* Create a task ID out of the hash. This will truncate the hash. */ task_id task_id; CHECK(sizeof(task_id) <= SHA256_BLOCK_SIZE); - memcpy(&task_id.id, buff, sizeof(task_id)); + memcpy(&task_id.id, buff, sizeof(task_id.id)); return task_id; } diff --git a/src/common/test/redis_tests.c b/src/common/test/redis_tests.c index f51e5457c..2e271b5bd 100644 --- a/src/common/test/redis_tests.c +++ b/src/common/test/redis_tests.c @@ -150,7 +150,7 @@ void logging_read_callback(event_loop *loop, db_handle *conn = context; char *cmd = read_log_message(fd); redisAsyncCommand(conn->context, logging_test_callback, NULL, cmd, - (char *) conn->client.id, sizeof(db_client_id)); + (char *) conn->client.id, sizeof(conn->client.id)); free(cmd); } diff --git a/src/global_scheduler/global_scheduler_algorithm.c b/src/global_scheduler/global_scheduler_algorithm.c index 7c2415068..362494ba2 100644 --- a/src/global_scheduler/global_scheduler_algorithm.c +++ b/src/global_scheduler/global_scheduler_algorithm.c @@ -24,6 +24,8 @@ void handle_local_scheduler_heartbeat(global_scheduler_state *state) { void handle_new_local_scheduler(global_scheduler_state *state, db_client_id db_client_id) { - local_scheduler local_scheduler = {.id = db_client_id}; + local_scheduler local_scheduler; + memset(&local_scheduler, 0, sizeof(local_scheduler)); + local_scheduler.id = db_client_id; utarray_push_back(state->local_schedulers, &local_scheduler); } diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index 1da91a13b..724bfe30f 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -9,11 +9,7 @@ #include "photon_scheduler.h" /* TODO(swang): We should set retry values in a config file somewhere. */ -const retry_info photon_retry = { - .num_retries = 0, - .timeout = 1000, - .fail_callback = NULL, -}; +const retry_info photon_retry = {0, 1000, NULL}; typedef struct task_queue_entry { /** The task that is queued. */ @@ -85,7 +81,7 @@ bool can_run(scheduler_state *s, task_spec *task) { if (task_arg_type(task, i) == ARG_BY_REF) { object_id obj_id = task_arg_id(task, i); available_object *entry; - HASH_FIND(handle, s->local_objects, &obj_id, sizeof(object_id), entry); + HASH_FIND(handle, s->local_objects, &obj_id, sizeof(obj_id), entry); if (entry == NULL) { /* The object is not present locally, so this task cannot be scheduled * right now. */ @@ -149,7 +145,7 @@ void queue_task_locally(scheduler_info *info, /* Copy the spec and add it to the task queue. The allocated spec will be * freed when it is assigned to a worker. */ task_queue_entry *elt = malloc(sizeof(task_queue_entry)); - elt->spec = malloc(task_spec_size(spec)); + elt->spec = (task_spec *) malloc(task_spec_size(spec)); memcpy(elt->spec, spec, task_spec_size(spec)); elt->from_global_scheduler = from_global_scheduler; DL_APPEND(s->task_queue, elt); diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 7fcff070f..02971d1bc 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -110,9 +110,11 @@ void assign_task_to_worker(scheduler_info *info, write_message(w->sock, EXECUTE_TASK, task_spec_size(spec), (uint8_t *) spec); /* Update the global task table. */ if (info->db != NULL) { - retry_info retry = { - .num_retries = 0, .timeout = 100, .fail_callback = NULL, - }; + retry_info retry; + memset(&retry, 0, sizeof(retry)); + retry.num_retries = 0; + retry.timeout = 100; + retry.fail_callback = NULL; task *task = alloc_task(spec, TASK_STATUS_RUNNING, get_db_client_id(info->db)); if (from_global_scheduler) { @@ -131,7 +133,7 @@ void process_plasma_notification(event_loop *loop, local_scheduler_state *s = context; /* Read the notification from Plasma. */ object_id obj_id; - recv(client_sock, &obj_id, sizeof(object_id), 0); + recv(client_sock, &obj_id, sizeof(obj_id), 0); handle_object_available(s->scheduler_info, s->scheduler_state, obj_id); } @@ -219,9 +221,11 @@ void start_server(const char *socket_name, * local scheduler by the global scheduler. TODO(rkn): we also need to get any * tasks that were assigned to this local scheduler before the call to * subscribe. */ - retry_info retry = { - .num_retries = 0, .timeout = 100, .fail_callback = NULL, - }; + retry_info retry; + memset(&retry, 0, sizeof(retry)); + retry.num_retries = 0; + retry.timeout = 100; + retry.fail_callback = NULL; if (g_state->scheduler_info->db != NULL) { task_table_subscribe(g_state->scheduler_info->db, get_db_client_id(g_state->scheduler_info->db), @@ -270,8 +274,9 @@ int main(int argc, char *argv[]) { /* Parse the Redis address into an IP address and a port. */ char redis_addr[16] = {0}; char redis_port[6] = {0}; - if (sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port) != - 2) { + int num_assigned = + sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port); + if (num_assigned != 2) { LOG_FATAL( "if a redis address is provided with the -r switch, it should be " "formatted like 127.0.0.1:6379"); diff --git a/src/plasma/eviction_policy.c b/src/plasma/eviction_policy.c index 562bbb372..77ed07b71 100644 --- a/src/plasma/eviction_policy.c +++ b/src/plasma/eviction_policy.c @@ -126,7 +126,7 @@ int64_t choose_objects_to_evict(eviction_state *eviction_state, /* Find the object table entry for this object. */ object_table_entry *entry; HASH_FIND(handle, plasma_store_info->objects, &element->object_id, - sizeof(object_id), entry); + sizeof(element->object_id), entry); /* Update the cumulative bytes and the number of objects so far. */ num_bytes += (entry->info.data_size + entry->info.metadata_size); num_objects += 1; diff --git a/src/plasma/plasma_client.c b/src/plasma/plasma_client.c index 1eb713ff5..c0222a408 100644 --- a/src/plasma/plasma_client.c +++ b/src/plasma/plasma_client.c @@ -300,7 +300,7 @@ void plasma_contains(plasma_connection *conn, plasma_request req = make_plasma_request(object_id); plasma_send_request(conn->store_conn, PLASMA_CONTAINS, &req); plasma_reply reply; - int r = read(conn->store_conn, &reply, sizeof(plasma_reply)); + int r = read(conn->store_conn, &reply, sizeof(reply)); CHECKM(r != -1, "read error"); CHECKM(r != 0, "connection disconnected"); *has_object = reply.has_object; @@ -325,7 +325,7 @@ int64_t plasma_evict(plasma_connection *conn, int64_t num_bytes) { plasma_send_request(conn->store_conn, PLASMA_EVICT, &req); /* Wait for a response with the number of bytes actually evicted. */ plasma_reply reply; - int r = read(conn->store_conn, &reply, sizeof(plasma_reply)); + int r = read(conn->store_conn, &reply, sizeof(reply)); CHECKM(r != -1, "read error"); CHECKM(r != 0, "connection disconnected"); return reply.num_bytes; @@ -498,8 +498,7 @@ void plasma_fetch(plasma_connection *conn, /* Update the correct index in is_fetched. */ int i = 0; for (; i < num_object_ids; i++) { - if (memcmp(&object_ids[i], &reply.object_ids[0], sizeof(object_id)) == - 0) { + if (object_ids_equal(object_ids[i], reply.object_ids[0])) { /* Check that this isn't a duplicate response. */ CHECK(!is_fetched[i]); is_fetched[i] = success; diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index 2c82b8dc9..ea4b1aa3d 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -151,8 +151,8 @@ int send_client_reply(client_connection *conn, plasma_reply *reply) { CHECK(conn->num_return_objects >= 0); --conn->num_return_objects; /* TODO(swang): Handle errors in write. */ - int n = write(conn->fd, (uint8_t *) reply, sizeof(plasma_reply)); - return (n != sizeof(plasma_reply)); + int n = write(conn->fd, (uint8_t *) reply, sizeof(*reply)); + return (n != sizeof(*reply)); } int send_client_failure_reply(object_id object_id, client_connection *conn) { diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index ed250b94a..8f6c85f92 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -40,7 +40,7 @@ void dlfree(void *); * This is used by the Plasma Store to send a reply to the Plasma Client. */ void plasma_send_reply(int fd, plasma_reply *reply) { - int reply_count = sizeof(plasma_reply); + int reply_count = sizeof(*reply); if (write(fd, reply, reply_count) != reply_count) { LOG_FATAL("write error, fd = %d", fd); } @@ -151,7 +151,7 @@ void create_object(client *client_context, object_table_entry *entry; /* TODO(swang): Return these error to the client instead of exiting. */ HASH_FIND(handle, plasma_state->plasma_store_info->objects, &obj_id, - sizeof(object_id), entry); + sizeof(obj_id), entry); CHECKM(entry == NULL, "Cannot create object twice."); /* Tell the eviction policy how much space we need to create this object. */ int64_t num_objects_to_evict; @@ -169,7 +169,7 @@ void create_object(client *client_context, assert(fd != -1); entry = malloc(sizeof(object_table_entry)); - memcpy(&entry->object_id, &obj_id, sizeof(object_id)); + memcpy(&entry->object_id, &obj_id, sizeof(entry->object_id)); entry->info.data_size = data_size; entry->info.metadata_size = metadata_size; entry->pointer = pointer; @@ -225,7 +225,7 @@ int get_object(client *client_context, notify_entry = malloc(sizeof(object_notify_entry)); memset(notify_entry, 0, sizeof(object_notify_entry)); utarray_new(notify_entry->waiting_clients, &client_icd); - memcpy(¬ify_entry->object_id, &object_id, sizeof(object_id)); + notify_entry->object_id = object_id; HASH_ADD(handle, plasma_state->objects_notify, object_id, sizeof(object_id), notify_entry); } @@ -380,9 +380,9 @@ void send_notifications(event_loop *loop, for (int i = 0; i < utarray_len(queue->object_ids); ++i) { object_id *obj_id = (object_id *) utarray_eltptr(queue->object_ids, i); /* Attempt to send a notification about this object ID. */ - int nbytes = send(client_sock, obj_id, sizeof(object_id), 0); + int nbytes = send(client_sock, obj_id, sizeof(*obj_id), 0); if (nbytes >= 0) { - CHECK(nbytes == sizeof(object_id)); + CHECK(nbytes == sizeof(*obj_id)); } else if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { LOG_DEBUG( "The socket's send buffer is full, so we are caching this " diff --git a/src/plasma/test/manager_tests.c b/src/plasma/test/manager_tests.c index 2c1311311..f33e6d7b1 100644 --- a/src/plasma/test/manager_tests.c +++ b/src/plasma/test/manager_tests.c @@ -163,7 +163,7 @@ TEST request_transfer_test(void) { read_message(read_fd, &type, &length, (uint8_t **) &req); ASSERT(type == PLASMA_TRANSFER); ASSERT(req->num_object_ids == 1); - ASSERT(memcmp(&oid, &req->object_ids[0], sizeof(object_id)) == 0); + ASSERT(object_ids_equal(oid, req->object_ids[0])); /* Clean up. */ utstring_free(addr); free(req); @@ -209,7 +209,7 @@ TEST request_transfer_retry_test(void) { read_message(read_fd, &type, &length, (uint8_t **) &req); ASSERT(type == PLASMA_TRANSFER); ASSERT(req->num_object_ids == 1); - ASSERT(memcmp(&oid, &req->object_ids[0], sizeof(object_id)) == 0); + ASSERT(object_ids_equal(oid, req->object_ids[0])); /* Clean up. */ utstring_free(addr0); utstring_free(addr1); @@ -249,7 +249,7 @@ TEST request_transfer_timeout_test(void) { int nbytes = recv(manager_fd, (uint8_t *) &reply, sizeof(reply), MSG_WAITALL); ASSERT_EQ(nbytes, sizeof(reply)); ASSERT_EQ(reply.num_object_ids, 1); - ASSERT_EQ(memcmp(&oid, &reply.object_ids, sizeof(object_id)), 0); + ASSERT(object_ids_equal(oid, reply.object_ids[0])); ASSERT_EQ(reply.has_object, 0); /* Clean up. */ utstring_free(addr); @@ -306,7 +306,7 @@ TEST read_write_object_chunk_test(void) { } SUITE(plasma_manager_tests) { - memset(&oid, 1, sizeof(object_id)); + memset(&oid, 1, sizeof(oid)); RUN_TEST(request_transfer_test); RUN_TEST(request_transfer_retry_test); RUN_TEST(request_transfer_timeout_test);