mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 08:31:42 +08:00
Implemented db_client_cache as unordered_map (#921)
* Implemented db_client_cache as unordered_map * Fix for memory leak * Fixed linting
This commit is contained in:
committed by
Philipp Moritz
parent
246be812f0
commit
2c19ae97a3
+12
-16
@@ -278,7 +278,6 @@ DBHandle *db_connect(const std::string &db_primary_address,
|
||||
|
||||
db->client_type = strdup(client_type);
|
||||
db->client = client;
|
||||
db->db_client_cache = NULL;
|
||||
|
||||
redisAsyncContext *context;
|
||||
redisAsyncContext *subscribe_context;
|
||||
@@ -324,12 +323,11 @@ void DBHandle_free(DBHandle *db) {
|
||||
}
|
||||
|
||||
/* Clean up memory. */
|
||||
DBClientCacheEntry *e, *tmp;
|
||||
HASH_ITER(hh, db->db_client_cache, e, tmp) {
|
||||
free(e->addr);
|
||||
HASH_DELETE(hh, db->db_client_cache, e);
|
||||
free(e);
|
||||
for (auto it = db->db_client_cache.begin(); it != db->db_client_cache.end();
|
||||
it = db->db_client_cache.erase(it)) {
|
||||
free(it->second);
|
||||
}
|
||||
|
||||
free(db->client_type);
|
||||
delete db;
|
||||
}
|
||||
@@ -607,24 +605,22 @@ void redis_result_table_lookup(TableCallbackData *callback_data) {
|
||||
void redis_get_cached_db_client(DBHandle *db,
|
||||
DBClientID db_client_id,
|
||||
const char **manager) {
|
||||
DBClientCacheEntry *entry;
|
||||
HASH_FIND(hh, db->db_client_cache, &db_client_id, sizeof(db_client_id),
|
||||
entry);
|
||||
if (!entry) {
|
||||
auto it = db->db_client_cache.find(db_client_id);
|
||||
|
||||
if (it == db->db_client_cache.end()) {
|
||||
/* This is a very rare case. It should happen at most once per db client. */
|
||||
redisReply *reply = (redisReply *) redisCommand(
|
||||
db->sync_context, "RAY.GET_CLIENT_ADDRESS %b", (char *) db_client_id.id,
|
||||
sizeof(db_client_id.id));
|
||||
CHECKM(reply->type == REDIS_REPLY_STRING, "REDIS reply type=%d, str=%s",
|
||||
reply->type, reply->str);
|
||||
entry = (DBClientCacheEntry *) malloc(sizeof(DBClientCacheEntry));
|
||||
entry->db_client_id = db_client_id;
|
||||
entry->addr = strdup(reply->str);
|
||||
HASH_ADD(hh, db->db_client_cache, db_client_id, sizeof(db_client_id),
|
||||
entry);
|
||||
char *addr = strdup(reply->str);
|
||||
freeReplyObject(reply);
|
||||
db->db_client_cache[db_client_id] = addr;
|
||||
*manager = addr;
|
||||
} else {
|
||||
*manager = it->second;
|
||||
}
|
||||
*manager = entry->addr;
|
||||
}
|
||||
|
||||
void redis_object_table_lookup_callback(redisAsyncContext *c,
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
#ifndef REDIS_H
|
||||
#define REDIS_H
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
#include "db.h"
|
||||
#include "object_table.h"
|
||||
#include "task_table.h"
|
||||
|
||||
#include "hiredis/hiredis.h"
|
||||
#include "hiredis/async.h"
|
||||
#include "uthash.h"
|
||||
#include "utarray.h"
|
||||
|
||||
/* Allow up to 5 seconds for connecting to Redis. */
|
||||
#define REDIS_DB_CONNECT_RETRIES 50
|
||||
@@ -20,15 +20,6 @@
|
||||
#define LOG_REDIS_DEBUG(context, M, ...) \
|
||||
LOG_DEBUG("Redis error %d %s; %s", context->err, context->errstr, M)
|
||||
|
||||
typedef struct {
|
||||
/** Unique ID for this db client. */
|
||||
DBClientID db_client_id;
|
||||
/** IP address and port of this db client. */
|
||||
char *addr;
|
||||
/** Handle for the uthash table. */
|
||||
UT_hash_handle hh;
|
||||
} DBClientCacheEntry;
|
||||
|
||||
struct DBHandle {
|
||||
/** String that identifies this client type. */
|
||||
char *client_type;
|
||||
@@ -56,9 +47,9 @@ struct DBHandle {
|
||||
event_loop *loop;
|
||||
/** Index of the database connection in the event loop */
|
||||
int64_t db_index;
|
||||
/** Cache for the IP addresses of db clients. This is a hash table mapping
|
||||
/** Cache for the IP addresses of db clients. This is an unordered map mapping
|
||||
* client IDs to addresses. */
|
||||
DBClientCacheEntry *db_client_cache;
|
||||
std::unordered_map<DBClientID, char *, UniqueIDHasher> db_client_cache;
|
||||
/** Redis context for synchronous connections. This should only be used very
|
||||
* rarely, it is not asynchronous. */
|
||||
redisContext *sync_context;
|
||||
|
||||
Reference in New Issue
Block a user