Warn the user when a nondeterministic task is detected. (#339)

* WARN instead of FATAL for object hash mismatches, push error to driver

* Document the callback signature for object_table_add/remove

* Error table

* Wait for all errors in python test

* Fix doc

* Fix state test
This commit is contained in:
Stephanie Wang
2017-03-07 00:32:15 -08:00
committed by Robert Nishihara
parent 0b8d279ef2
commit da06b4db82
15 changed files with 303 additions and 78 deletions
+1
View File
@@ -46,6 +46,7 @@ add_library(common STATIC
state/db_client_table.cc
state/actor_notification_table.cc
state/local_scheduler_table.cc
state/error_table.cc
thirdparty/ae/ae.c
thirdparty/sha256.c)
+13 -10
View File
@@ -388,9 +388,9 @@ bool PublishObjectNotification(RedisModuleCtx *ctx,
* @param hash_string A string which is a hash of the object.
* @param manager A string which represents the manager ID of the plasma manager
* that has the object.
* @return OK if the operation was successful and an error with string
* "hash mismatch" if the same object_id is already present with a
* different hash value.
* @return OK if the operation was successful. If the same object_id is already
* present with a different hash value, the entry is still added, but
* an error with string "hash mismatch" is returned.
*/
int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
@@ -416,6 +416,7 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
REDISMODULE_READ | REDISMODULE_WRITE);
/* Check if this object was already registered and if the hashes agree. */
bool hash_mismatch = false;
if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_EMPTY) {
RedisModuleString *existing_hash;
RedisModule_HashGet(key, REDISMODULE_HASH_CFIELDS, "hash", &existing_hash,
@@ -423,11 +424,9 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
/* The existing hash may be NULL even if the key is present because a call
* to RAY.RESULT_TABLE_ADD may have already created the key. */
if (existing_hash != NULL) {
if (RedisModule_StringCompare(existing_hash, new_hash) != 0) {
RedisModule_CloseKey(key);
RedisModule_FreeString(ctx, existing_hash);
return RedisModule_ReplyWithError(ctx, "hash mismatch");
}
/* Check whether the new hash value matches the old one. If not, we will
* later return the "hash mismatch" error. */
hash_mismatch = (RedisModule_StringCompare(existing_hash, new_hash) != 0);
RedisModule_FreeString(ctx, existing_hash);
}
}
@@ -493,8 +492,12 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
}
RedisModule_CloseKey(table_key);
RedisModule_ReplyWithSimpleString(ctx, "OK");
return REDISMODULE_OK;
if (hash_mismatch) {
return RedisModule_ReplyWithError(ctx, "hash mismatch");
} else {
RedisModule_ReplyWithSimpleString(ctx, "OK");
return REDISMODULE_OK;
}
}
/**
+23
View File
@@ -0,0 +1,23 @@
#include "error_table.h"
#include "redis.h"
void push_error(DBHandle *db_handle,
DBClientID driver_id,
int error_index,
size_t data_length,
unsigned char *data) {
CHECK(error_index >= 0 && error_index < MAX_ERROR_INDEX);
/* Allocate a struct to hold the error information. */
ErrorInfo *info = (ErrorInfo *) malloc(sizeof(ErrorInfo) + data_length);
info->driver_id = driver_id;
info->error_index = error_index;
info->data_length = data_length;
memcpy(info->data, data, data_length);
/* Generate a random key to identify this error message. */
CHECK(sizeof(info->error_key) >= UNIQUE_ID_SIZE);
UniqueID error_key = globally_unique_id();
memcpy(info->error_key, error_key.id, sizeof(info->error_key));
init_table_callback(db_handle, NIL_ID, __func__, info, NULL, NULL,
redis_push_error, NULL);
}
+50
View File
@@ -0,0 +1,50 @@
#ifndef ERROR_TABLE_H
#define ERROR_TABLE_H
#include "db.h"
#include "table.h"
typedef struct {
DBClientID driver_id;
unsigned char error_key[20];
int error_index;
size_t data_length;
unsigned char data[0];
} ErrorInfo;
/** An error_index may be used as an index into error_types and
* error_messages. */
typedef enum {
/** An object was added with a different hash from the existing
* one. */
OBJECT_HASH_MISMATCH_ERROR_INDEX = 0,
/** The total number of error types. */
MAX_ERROR_INDEX
} error_index;
/** Information about the error to be displayed to the user. */
static const char *error_types[] = {"object_hash_mismatch"};
static const char *error_messages[] = {
"A nondeterministic task was reexecuted."};
/**
* Push an error to the given Python driver.
*
* @param db_handle Database handle.
* @param driver_id The ID of the Python driver to push the error
* to.
* @param error_index The error information at this index in
* error_types and error_messages will be included in the
* error pushed to the driver.
* @param data_length The length of the custom data to be included
* in the error.
* @param data The custom data to be included in the error.
* @return Void.
*/
void push_error(DBHandle *db_handle,
DBClientID driver_id,
int error_index,
size_t data_length,
unsigned char *data);
#endif
+11 -1
View File
@@ -49,8 +49,18 @@ void object_table_lookup(DBHandle *db_handle,
* ==== Add object call and callback ====
*/
/* Callback called when the object add/remove operation completes. */
/**
* Callback called when the object add/remove operation completes.
*
* @param object_id The ID of the object that was added or removed.
* @param success Whether the operation was successful or not. If this is false
* and the operation was an addition, the object was added, but there
* was a hash mismatch.
* @param user_context The user context that was passed into the add/remove
* call.
*/
typedef void (*object_table_done_callback)(ObjectID object_id,
bool success,
void *user_context);
/**
+61 -6
View File
@@ -22,6 +22,7 @@ extern "C" {
#include "object_info.h"
#include "task.h"
#include "task_table.h"
#include "error_table.h"
#include "event_loop.h"
#include "redis.h"
#include "io.h"
@@ -217,21 +218,23 @@ void redis_object_table_add_callback(redisAsyncContext *c,
/* Do some minimal checking. */
redisReply *reply = (redisReply *) r;
if (strcmp(reply->str, "hash mismatch") == 0) {
bool success = (strcmp(reply->str, "hash mismatch") != 0);
if (!success) {
/* If our object hash doesn't match the one recorded in the table, report
* the error back to the user and exit immediately. */
LOG_FATAL(
LOG_WARN(
"Found objects with different value but same object ID, most likely "
"because a nondeterministic task was executed twice, either for "
"reconstruction or for speculation.");
} else {
CHECK(reply->type != REDIS_REPLY_ERROR);
CHECK(strcmp(reply->str, "OK") == 0);
}
CHECK(reply->type != REDIS_REPLY_ERROR);
CHECK(strcmp(reply->str, "OK") == 0);
/* Call the done callback if there is one. */
if (callback_data->done_callback != NULL) {
object_table_done_callback done_callback =
(object_table_done_callback) callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
done_callback(callback_data->id, success, callback_data->user_context);
}
/* Clean up the timer and callback. */
destroy_timer_callback(db->loop, callback_data);
@@ -274,7 +277,7 @@ void redis_object_table_remove_callback(redisAsyncContext *c,
if (callback_data->done_callback != NULL) {
object_table_done_callback done_callback =
(object_table_done_callback) callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
done_callback(callback_data->id, true, callback_data->user_context);
}
/* Clean up the timer and callback. */
destroy_timer_callback(db->loop, callback_data);
@@ -1275,6 +1278,58 @@ void redis_object_info_subscribe(TableCallbackData *callback_data) {
}
}
void redis_push_error_rpush_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r);
redisReply *reply = (redisReply *) r;
/* The reply should be the length of the errors list after our RPUSH. */
CHECK(reply->type == REDIS_REPLY_INTEGER);
destroy_timer_callback(db->loop, callback_data);
}
void redis_push_error_hmset_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r);
redisReply *reply = (redisReply *) r;
/* Make sure we were able to add the error information. */
CHECK(reply->type != REDIS_REPLY_ERROR);
CHECK(strcmp(reply->str, "OK") == 0);
/* Add the error to this driver's list of errors. */
ErrorInfo *info = (ErrorInfo *) callback_data->data;
int status = redisAsyncCommand(db->context, redis_push_error_rpush_callback,
(void *) callback_data->timer_id,
"RPUSH ErrorKeys Error:%b:%b",
info->driver_id.id, sizeof(info->driver_id.id),
info->error_key, sizeof(info->error_key));
if ((status == REDIS_ERR) || db->sub_context->err) {
LOG_REDIS_DEBUG(db->sub_context, "error in redis_push_error rpush");
}
}
void redis_push_error(TableCallbackData *callback_data) {
DBHandle *db = callback_data->db_handle;
ErrorInfo *info = (ErrorInfo *) callback_data->data;
CHECK(info->error_index < MAX_ERROR_INDEX && info->error_index >= 0);
/* Look up the error type. */
const char *error_type = error_types[info->error_index];
const char *error_message = error_messages[info->error_index];
/* Set the error information. */
int status = redisAsyncCommand(
db->context, redis_push_error_hmset_callback,
(void *) callback_data->timer_id,
"HMSET Error:%b:%b type %s message %s data %b", info->driver_id.id,
sizeof(info->driver_id.id), info->error_key, sizeof(info->error_key),
error_type, error_message, info->data, info->data_length);
if ((status == REDIS_ERR) || db->sub_context->err) {
LOG_REDIS_DEBUG(db->sub_context, "error in redis_push_error hmset");
}
}
DBClientID get_db_client_id(DBHandle *db) {
CHECK(db != NULL);
return db->client;
+2
View File
@@ -264,4 +264,6 @@ void redis_actor_notification_table_subscribe(TableCallbackData *callback_data);
void redis_object_info_subscribe(TableCallbackData *callback_data);
void redis_push_error(TableCallbackData *callback_data);
#endif /* REDIS_H */
+1 -1
View File
@@ -55,7 +55,7 @@ void lookup_done_callback(ObjectID object_id,
}
/* Entry added to database successfully. */
void add_done_callback(ObjectID object_id, void *user_context) {}
void add_done_callback(ObjectID object_id, bool success, void *user_context) {}
/* Test if we got a timeout callback if we couldn't connect database. */
void timeout_callback(ObjectID object_id, void *context, void *user_data) {
+12 -5
View File
@@ -172,7 +172,7 @@ TEST lookup_timeout_test(void) {
const char *add_timeout_context = "add_timeout";
int add_failed = 0;
void add_done_callback(ObjectID object_id, void *user_context) {
void add_done_callback(ObjectID object_id, bool success, void *user_context) {
/* The done callback should not be called. */
CHECK(0);
}
@@ -305,7 +305,8 @@ void add_lookup_done_callback(ObjectID object_id,
lookup_retry_succeeded = 1;
}
void add_lookup_callback(ObjectID object_id, void *user_context) {
void add_lookup_callback(ObjectID object_id, bool success, void *user_context) {
CHECK(success);
DBHandle *db = (DBHandle *) user_context;
RetryInfo retry = {
.num_retries = 5,
@@ -353,7 +354,10 @@ void add_remove_lookup_done_callback(ObjectID object_id,
lookup_retry_succeeded = 1;
}
void add_remove_lookup_callback(ObjectID object_id, void *user_context) {
void add_remove_lookup_callback(ObjectID object_id,
bool success,
void *user_context) {
CHECK(success);
DBHandle *db = (DBHandle *) user_context;
RetryInfo retry = {
.num_retries = 5,
@@ -364,7 +368,8 @@ void add_remove_lookup_callback(ObjectID object_id, void *user_context) {
(void *) lookup_retry_context);
}
void add_remove_callback(ObjectID object_id, void *user_context) {
void add_remove_callback(ObjectID object_id, bool success, void *user_context) {
CHECK(success);
DBHandle *db = (DBHandle *) user_context;
RetryInfo retry = {
.num_retries = 5,
@@ -482,7 +487,9 @@ void add_late_fail_callback(UniqueID id, void *user_context, void *user_data) {
add_late_failed = 1;
}
void add_late_done_callback(ObjectID object_id, void *user_context) {
void add_late_done_callback(ObjectID object_id,
bool success,
void *user_context) {
/* This function should never be called. */
CHECK(0);
}
@@ -351,7 +351,9 @@ TaskSpec *object_reconstruction_suppression_spec;
int64_t object_reconstruction_suppression_size;
void object_reconstruction_suppression_callback(ObjectID object_id,
bool success,
void *user_context) {
CHECK(success);
/* Submit the task after adding the object to the object table. */
LocalSchedulerConnection *worker = (LocalSchedulerConnection *) user_context;
local_scheduler_submit(worker, object_reconstruction_suppression_spec,
+44 -5
View File
@@ -35,6 +35,8 @@
#include "plasma_manager.h"
#include "state/db.h"
#include "state/object_table.h"
#include "state/error_table.h"
#include "state/task_table.h"
/**
* Process either the fetch or the status request.
@@ -1270,6 +1272,44 @@ void process_delete_object_notification(PlasmaManagerState *state,
* up-to-date. */
}
void log_object_hash_mismatch_error_task_callback(Task *task,
void *user_context) {
CHECK(task != NULL);
PlasmaManagerState *state = (PlasmaManagerState *) user_context;
TaskSpec *spec = Task_task_spec(task);
FunctionID function = TaskSpec_function(spec);
/* Push the error to the Python driver that caused the nondeterministic task
* to be submitted. */
push_error(state->db, TaskSpec_driver_id(spec),
OBJECT_HASH_MISMATCH_ERROR_INDEX, sizeof(function), function.id);
}
void log_object_hash_mismatch_error_result_callback(ObjectID object_id,
TaskID task_id,
void *user_context) {
CHECK(!IS_NIL_ID(task_id));
PlasmaManagerState *state = (PlasmaManagerState *) user_context;
/* Get the specification for the nondeterministic task. */
task_table_get_task(state->db, task_id, NULL,
log_object_hash_mismatch_error_task_callback, state);
}
void log_object_hash_mismatch_error_object_callback(ObjectID object_id,
bool success,
void *user_context) {
if (success) {
/* The object was added successfully. */
return;
}
/* The object was added, but there was an object hash mismatch. In this case,
* look up the task that created the object so we can notify the Python
* driver that the task is nondeterministic. */
PlasmaManagerState *state = (PlasmaManagerState *) user_context;
result_table_lookup(state->db, object_id, NULL,
log_object_hash_mismatch_error_result_callback, state);
}
void process_add_object_notification(PlasmaManagerState *state,
ObjectInfo object_info) {
ObjectID obj_id = object_info.obj_id;
@@ -1280,11 +1320,10 @@ void process_add_object_notification(PlasmaManagerState *state,
/* Add this object to the (redis) object table. */
if (state->db) {
/* TODO(swang): Log the error if we fail to add the object, and possibly
* retry later? */
object_table_add(state->db, obj_id,
object_info.data_size + object_info.metadata_size,
object_info.digest, NULL, NULL, NULL);
object_table_add(
state->db, obj_id, object_info.data_size + object_info.metadata_size,
object_info.digest, NULL,
log_object_hash_mismatch_error_object_callback, (void *) state);
}
/* If we were trying to fetch this object, finish up the fetch request. */