Adding object table (#1)

* code for maintaining the object table

* Makefile fix

* Clone git submodules.

* directory -> object_table

* Fix Makefile and remove unnecessary files.

* Fix formatting.

* make code more generic
This commit is contained in:
Philipp Moritz
2016-09-13 18:54:26 -07:00
committed by Robert Nishihara
parent a35de3b287
commit 7d629d4e48
21 changed files with 2322 additions and 0 deletions
+6
View File
@@ -0,0 +1,6 @@
BasedOnStyle: Chromium
DerivePointerAlignment: true
IndentCaseLabels: false
PointerAlignment: Right
SpaceAfterCStyleCast: true
+2
View File
@@ -1,3 +1,5 @@
*~
# Object files
*.o
*.ko
+3
View File
@@ -0,0 +1,3 @@
[submodule "thirdparty/hiredis"]
path = thirdparty/hiredis
url = https://github.com/redis/hiredis
+26
View File
@@ -0,0 +1,26 @@
sudo: required
language: generic
matrix:
include:
- os: linux
dist: trusty
- os: osx
osx_image: xcode7
- os: linux
dist: trusty
env: LINT=1
before_install:
# In case we ever want to use a different version of clang-format:
#- wget -O - http://apt.llvm.org/llvm-snapshot.gpg.key | sudo apt-key add -
#- echo "deb http://apt.llvm.org/trusty/ llvm-toolchain-trusty main" | sudo tee -a /etc/apt/sources.list > /dev/null
- sudo apt-get update -qq
- sudo apt-get install -qq clang-format-3.8
install: []
script:
- .travis/check-git-clang-format-output.sh
install:
- make
- make test
+18
View File
@@ -0,0 +1,18 @@
#!/bin/bash
if [ "$TRAVIS_PULL_REQUEST" == "false" ] ; then
# Not in a pull request, so compare against parent commit
base_commit="HEAD^"
echo "Running clang-format against parent commit $(git rev-parse $base_commit)"
else
base_commit="$TRAVIS_BRANCH"
echo "Running clang-format against branch $base_commit, with hash $(git rev-parse $base_commit)"
fi
output="$(.travis/git-clang-format --binary clang-format-3.8 --commit $base_commit --diff --exclude ^thirdparty/)"
if [ "$output" == "no modified files to format" ] || [ "$output" == "clang-format did not modify any files" ] ; then
echo "clang-format passed."
exit 0
else
echo "clang-format failed:"
echo "$output"
exit 1
fi
+476
View File
@@ -0,0 +1,476 @@
#!/usr/bin/env python
#
#===- git-clang-format - ClangFormat Git Integration ---------*- python -*--===#
#
# The LLVM Compiler Infrastructure
#
# This file is distributed under the University of Illinois Open Source
# License. See LICENSE.TXT for details.
#
#===------------------------------------------------------------------------===#
r"""
clang-format git integration
============================
This file provides a clang-format integration for git. Put it somewhere in your
path and ensure that it is executable. Then, "git clang-format" will invoke
clang-format on the changes in current files or a specific commit.
For further details, run:
git clang-format -h
Requires Python 2.7
"""
import argparse
import collections
import contextlib
import errno
import os
import re
import subprocess
import sys
usage = 'git clang-format [OPTIONS] [<commit>] [--] [<file>...]'
desc = '''
Run clang-format on all lines that differ between the working directory
and <commit>, which defaults to HEAD. Changes are only applied to the working
directory.
The following git-config settings set the default of the corresponding option:
clangFormat.binary
clangFormat.commit
clangFormat.extension
clangFormat.style
'''
# Name of the temporary index file in which save the output of clang-format.
# This file is created within the .git directory.
temp_index_basename = 'clang-format-index'
Range = collections.namedtuple('Range', 'start, count')
def main():
config = load_git_config()
# In order to keep '--' yet allow options after positionals, we need to
# check for '--' ourselves. (Setting nargs='*' throws away the '--', while
# nargs=argparse.REMAINDER disallows options after positionals.)
argv = sys.argv[1:]
try:
idx = argv.index('--')
except ValueError:
dash_dash = []
else:
dash_dash = argv[idx:]
argv = argv[:idx]
default_extensions = ','.join([
# From clang/lib/Frontend/FrontendOptions.cpp, all lower case
'c', 'h', # C
'm', # ObjC
'mm', # ObjC++
'cc', 'cp', 'cpp', 'c++', 'cxx', 'hpp', # C++
# Other languages that clang-format supports
'proto', 'protodevel', # Protocol Buffers
'js', # JavaScript
'ts', # TypeScript
])
p = argparse.ArgumentParser(
usage=usage, formatter_class=argparse.RawDescriptionHelpFormatter,
description=desc)
p.add_argument('--binary',
default=config.get('clangformat.binary', 'clang-format'),
help='path to clang-format'),
p.add_argument('--commit',
default=config.get('clangformat.commit', 'HEAD'),
help='default commit to use if none is specified'),
p.add_argument('--diff', action='store_true',
help='print a diff instead of applying the changes')
p.add_argument('--extensions',
default=config.get('clangformat.extensions',
default_extensions),
help=('comma-separated list of file extensions to format, '
'excluding the period and case-insensitive')),
p.add_argument('--exclude', help='Exclude files matching this regex.')
p.add_argument('-f', '--force', action='store_true',
help='allow changes to unstaged files')
p.add_argument('-p', '--patch', action='store_true',
help='select hunks interactively')
p.add_argument('-q', '--quiet', action='count', default=0,
help='print less information')
p.add_argument('--style',
default=config.get('clangformat.style', None),
help='passed to clang-format'),
p.add_argument('-v', '--verbose', action='count', default=0,
help='print extra information')
# We gather all the remaining positional arguments into 'args' since we need
# to use some heuristics to determine whether or not <commit> was present.
# However, to print pretty messages, we make use of metavar and help.
p.add_argument('args', nargs='*', metavar='<commit>',
help='revision from which to compute the diff')
p.add_argument('ignored', nargs='*', metavar='<file>...',
help='if specified, only consider differences in these files')
opts = p.parse_args(argv)
opts.verbose -= opts.quiet
del opts.quiet
commit, files = interpret_args(opts.args, dash_dash, opts.commit)
changed_lines = compute_diff_and_extract_lines(commit, files)
if opts.verbose >= 1:
ignored_files = set(changed_lines)
filter_by_extension(changed_lines, opts.extensions.lower().split(','))
if opts.exclude:
for filename in changed_lines.keys():
if re.match(opts.exclude, filename):
del changed_lines[filename]
if opts.verbose >= 1:
ignored_files.difference_update(changed_lines)
if ignored_files:
print 'Ignoring changes in the following files:'
for filename in ignored_files:
print ' ', filename
if changed_lines:
print 'Running clang-format on the following files:'
for filename in changed_lines:
print ' ', filename
if not changed_lines:
print 'no modified files to format'
return
# The computed diff outputs absolute paths, so we must cd before accessing
# those files.
cd_to_toplevel()
old_tree = create_tree_from_workdir(changed_lines)
new_tree = run_clang_format_and_save_to_tree(changed_lines,
binary=opts.binary,
style=opts.style)
if opts.verbose >= 1:
print 'old tree:', old_tree
print 'new tree:', new_tree
if old_tree == new_tree:
if opts.verbose >= 0:
print 'clang-format did not modify any files'
elif opts.diff:
print_diff(old_tree, new_tree)
else:
changed_files = apply_changes(old_tree, new_tree, force=opts.force,
patch_mode=opts.patch)
if (opts.verbose >= 0 and not opts.patch) or opts.verbose >= 1:
print 'changed files:'
for filename in changed_files:
print ' ', filename
def load_git_config(non_string_options=None):
"""Return the git configuration as a dictionary.
All options are assumed to be strings unless in `non_string_options`, in which
is a dictionary mapping option name (in lower case) to either "--bool" or
"--int"."""
if non_string_options is None:
non_string_options = {}
out = {}
for entry in run('git', 'config', '--list', '--null').split('\0'):
if entry:
name, value = entry.split('\n', 1)
if name in non_string_options:
value = run('git', 'config', non_string_options[name], name)
out[name] = value
return out
def interpret_args(args, dash_dash, default_commit):
"""Interpret `args` as "[commit] [--] [files...]" and return (commit, files).
It is assumed that "--" and everything that follows has been removed from
args and placed in `dash_dash`.
If "--" is present (i.e., `dash_dash` is non-empty), the argument to its
left (if present) is taken as commit. Otherwise, the first argument is
checked if it is a commit or a file. If commit is not given,
`default_commit` is used."""
if dash_dash:
if len(args) == 0:
commit = default_commit
elif len(args) > 1:
die('at most one commit allowed; %d given' % len(args))
else:
commit = args[0]
object_type = get_object_type(commit)
if object_type not in ('commit', 'tag'):
if object_type is None:
die("'%s' is not a commit" % commit)
else:
die("'%s' is a %s, but a commit was expected" % (commit, object_type))
files = dash_dash[1:]
elif args:
if disambiguate_revision(args[0]):
commit = args[0]
files = args[1:]
else:
commit = default_commit
files = args
else:
commit = default_commit
files = []
return commit, files
def disambiguate_revision(value):
"""Returns True if `value` is a revision, False if it is a file, or dies."""
# If `value` is ambiguous (neither a commit nor a file), the following
# command will die with an appropriate error message.
run('git', 'rev-parse', value, verbose=False)
object_type = get_object_type(value)
if object_type is None:
return False
if object_type in ('commit', 'tag'):
return True
die('`%s` is a %s, but a commit or filename was expected' %
(value, object_type))
def get_object_type(value):
"""Returns a string description of an object's type, or None if it is not
a valid git object."""
cmd = ['git', 'cat-file', '-t', value]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
return None
return stdout.strip()
def compute_diff_and_extract_lines(commit, files):
"""Calls compute_diff() followed by extract_lines()."""
diff_process = compute_diff(commit, files)
changed_lines = extract_lines(diff_process.stdout)
diff_process.stdout.close()
diff_process.wait()
if diff_process.returncode != 0:
# Assume error was already printed to stderr.
sys.exit(2)
return changed_lines
def compute_diff(commit, files):
"""Return a subprocess object producing the diff from `commit`.
The return value's `stdin` file object will produce a patch with the
differences between the working directory and `commit`, filtered on `files`
(if non-empty). Zero context lines are used in the patch."""
cmd = ['git', 'diff-index', '-p', '-U0', commit, '--']
cmd.extend(files)
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
p.stdin.close()
return p
def extract_lines(patch_file):
"""Extract the changed lines in `patch_file`.
The return value is a dictionary mapping filename to a list of (start_line,
line_count) pairs.
The input must have been produced with ``-U0``, meaning unidiff format with
zero lines of context. The return value is a dict mapping filename to a
list of line `Range`s."""
matches = {}
for line in patch_file:
match = re.search(r'^\+\+\+\ [^/]+/(.*)', line)
if match:
filename = match.group(1).rstrip('\r\n')
match = re.search(r'^@@ -[0-9,]+ \+(\d+)(,(\d+))?', line)
if match:
start_line = int(match.group(1))
line_count = 1
if match.group(3):
line_count = int(match.group(3))
if line_count > 0:
matches.setdefault(filename, []).append(Range(start_line, line_count))
return matches
def filter_by_extension(dictionary, allowed_extensions):
"""Delete every key in `dictionary` that doesn't have an allowed extension.
`allowed_extensions` must be a collection of lowercase file extensions,
excluding the period."""
allowed_extensions = frozenset(allowed_extensions)
for filename in dictionary.keys():
base_ext = filename.rsplit('.', 1)
if len(base_ext) == 1 or base_ext[1].lower() not in allowed_extensions:
del dictionary[filename]
def cd_to_toplevel():
"""Change to the top level of the git repository."""
toplevel = run('git', 'rev-parse', '--show-toplevel')
os.chdir(toplevel)
def create_tree_from_workdir(filenames):
"""Create a new git tree with the given files from the working directory.
Returns the object ID (SHA-1) of the created tree."""
return create_tree(filenames, '--stdin')
def run_clang_format_and_save_to_tree(changed_lines, binary='clang-format',
style=None):
"""Run clang-format on each file and save the result to a git tree.
Returns the object ID (SHA-1) of the created tree."""
def index_info_generator():
for filename, line_ranges in changed_lines.iteritems():
mode = oct(os.stat(filename).st_mode)
blob_id = clang_format_to_blob(filename, line_ranges, binary=binary,
style=style)
yield '%s %s\t%s' % (mode, blob_id, filename)
return create_tree(index_info_generator(), '--index-info')
def create_tree(input_lines, mode):
"""Create a tree object from the given input.
If mode is '--stdin', it must be a list of filenames. If mode is
'--index-info' is must be a list of values suitable for "git update-index
--index-info", such as "<mode> <SP> <sha1> <TAB> <filename>". Any other mode
is invalid."""
assert mode in ('--stdin', '--index-info')
cmd = ['git', 'update-index', '--add', '-z', mode]
with temporary_index_file():
p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
for line in input_lines:
p.stdin.write('%s\0' % line)
p.stdin.close()
if p.wait() != 0:
die('`%s` failed' % ' '.join(cmd))
tree_id = run('git', 'write-tree')
return tree_id
def clang_format_to_blob(filename, line_ranges, binary='clang-format',
style=None):
"""Run clang-format on the given file and save the result to a git blob.
Returns the object ID (SHA-1) of the created blob."""
clang_format_cmd = [binary, filename]
if style:
clang_format_cmd.extend(['-style='+style])
clang_format_cmd.extend([
'-lines=%s:%s' % (start_line, start_line+line_count-1)
for start_line, line_count in line_ranges])
try:
clang_format = subprocess.Popen(clang_format_cmd, stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
except OSError as e:
if e.errno == errno.ENOENT:
die('cannot find executable "%s"' % binary)
else:
raise
clang_format.stdin.close()
hash_object_cmd = ['git', 'hash-object', '-w', '--path='+filename, '--stdin']
hash_object = subprocess.Popen(hash_object_cmd, stdin=clang_format.stdout,
stdout=subprocess.PIPE)
clang_format.stdout.close()
stdout = hash_object.communicate()[0]
if hash_object.returncode != 0:
die('`%s` failed' % ' '.join(hash_object_cmd))
if clang_format.wait() != 0:
die('`%s` failed' % ' '.join(clang_format_cmd))
return stdout.rstrip('\r\n')
@contextlib.contextmanager
def temporary_index_file(tree=None):
"""Context manager for setting GIT_INDEX_FILE to a temporary file and deleting
the file afterward."""
index_path = create_temporary_index(tree)
old_index_path = os.environ.get('GIT_INDEX_FILE')
os.environ['GIT_INDEX_FILE'] = index_path
try:
yield
finally:
if old_index_path is None:
del os.environ['GIT_INDEX_FILE']
else:
os.environ['GIT_INDEX_FILE'] = old_index_path
os.remove(index_path)
def create_temporary_index(tree=None):
"""Create a temporary index file and return the created file's path.
If `tree` is not None, use that as the tree to read in. Otherwise, an
empty index is created."""
gitdir = run('git', 'rev-parse', '--git-dir')
path = os.path.join(gitdir, temp_index_basename)
if tree is None:
tree = '--empty'
run('git', 'read-tree', '--index-output='+path, tree)
return path
def print_diff(old_tree, new_tree):
"""Print the diff between the two trees to stdout."""
# We use the porcelain 'diff' and not plumbing 'diff-tree' because the output
# is expected to be viewed by the user, and only the former does nice things
# like color and pagination.
subprocess.check_call(['git', 'diff', old_tree, new_tree, '--'])
def apply_changes(old_tree, new_tree, force=False, patch_mode=False):
"""Apply the changes in `new_tree` to the working directory.
Bails if there are local changes in those files and not `force`. If
`patch_mode`, runs `git checkout --patch` to select hunks interactively."""
changed_files = run('git', 'diff-tree', '-r', '-z', '--name-only', old_tree,
new_tree).rstrip('\0').split('\0')
if not force:
unstaged_files = run('git', 'diff-files', '--name-status', *changed_files)
if unstaged_files:
print >>sys.stderr, ('The following files would be modified but '
'have unstaged changes:')
print >>sys.stderr, unstaged_files
print >>sys.stderr, 'Please commit, stage, or stash them first.'
sys.exit(2)
if patch_mode:
# In patch mode, we could just as well create an index from the new tree
# and checkout from that, but then the user will be presented with a
# message saying "Discard ... from worktree". Instead, we use the old
# tree as the index and checkout from new_tree, which gives the slightly
# better message, "Apply ... to index and worktree". This is not quite
# right, since it won't be applied to the user's index, but oh well.
with temporary_index_file(old_tree):
subprocess.check_call(['git', 'checkout', '--patch', new_tree])
index_tree = old_tree
else:
with temporary_index_file(new_tree):
run('git', 'checkout-index', '-a', '-f')
return changed_files
def run(*args, **kwargs):
stdin = kwargs.pop('stdin', '')
verbose = kwargs.pop('verbose', True)
strip = kwargs.pop('strip', True)
for name in kwargs:
raise TypeError("run() got an unexpected keyword argument '%s'" % name)
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
stdout, stderr = p.communicate(input=stdin)
if p.returncode == 0:
if stderr:
if verbose:
print >>sys.stderr, '`%s` printed to stderr:' % ' '.join(args)
print >>sys.stderr, stderr.rstrip()
if strip:
stdout = stdout.rstrip('\r\n')
return stdout
if verbose:
print >>sys.stderr, '`%s` returned %s' % (' '.join(args), p.returncode)
if stderr:
print >>sys.stderr, stderr.rstrip()
sys.exit(2)
def die(message):
print >>sys.stderr, 'error:', message
sys.exit(2)
if __name__ == '__main__':
main()
+24
View File
@@ -0,0 +1,24 @@
CC = gcc
CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L
BUILD = build
CFLAGS += -Wmissing-prototypes
CFLAGS += -Wstrict-prototypes
CFLAGS += -Wmissing-declarations
$(BUILD)/db_tests: hiredis test/db_tests.c thirdparty/greatest.h event_loop.c state/redis.c common.c
$(CC) -o $@ test/db_tests.c event_loop.c state/redis.c common.c thirdparty/hiredis/libhiredis.a $(CFLAGS) -I. -Ithirdparty
clean:
rm -r $(BUILD)/*
redis:
cd thirdparty ; bash ./build-redis.sh
hiredis:
git submodule update --init --recursive -- "thirdparty/hiredis" ; cd thirdparty/hiredis ; make
test: hiredis redis $(BUILD)/db_tests FORCE
./thirdparty/redis-3.2.3/src/redis-server & sleep 1s ; ./build/db_tests
FORCE:
View File
+15
View File
@@ -0,0 +1,15 @@
#include "common.h"
char *sha1_to_hex(const unsigned char *sha1, char *buffer) {
static const char hex[] = "0123456789abcdef";
char *buf = buffer;
for (int i = 0; i < UNIQUE_ID_SIZE; i++) {
unsigned int val = *sha1++;
*buf++ = hex[val >> 4];
*buf++ = hex[val & 0xf];
}
*buf = '\0';
return buffer;
}
+29
View File
@@ -0,0 +1,29 @@
#ifndef COMMON_H
#define COMMON_H
#include <errno.h>
#ifdef NDEBUG
#define LOG_DEBUG(M, ...)
#else
#define LOG_DEBUG(M, ...) \
fprintf(stderr, "[DEBUG] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
#endif
#define LOG_ERR(M, ...) \
fprintf(stderr, "[ERROR] (%s:%d: errno: %s) " M "\n", __FILE__, __LINE__, \
errno == 0 ? "None" : strerror(errno), ##__VA_ARGS__)
#define LOG_INFO(M, ...) \
fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
#define UNIQUE_ID_SIZE 20
typedef struct { unsigned char id[UNIQUE_ID_SIZE]; } unique_id;
/* Convert a 20 byte sha1 hash to a hexdecimal string. This function assumes
* that buffer points to an already allocated char array of size 2 *
* UNIQUE_ID_SIZE + 1 */
char *sha1_to_hex(const unsigned char *sha1, char *buffer);
#endif
+92
View File
@@ -0,0 +1,92 @@
#include "event_loop.h"
#include <assert.h>
#include <unistd.h>
UT_icd item_icd = {sizeof(event_loop_item), NULL, NULL, NULL};
UT_icd poll_icd = {sizeof(struct pollfd), NULL, NULL, NULL};
/* Initializes the event loop.
* This function needs to be called before any other event loop function. */
void event_loop_init(event_loop *loop) {
utarray_new(loop->items, &item_icd);
utarray_new(loop->waiting, &poll_icd);
}
/* Add a new file descriptor fd to the event loop.
* This function sets a user defined type and id for the file descriptor
* which can be queried using event_loop_type and event_loop_id. The parameter
* events is the same as in http://linux.die.net/man/2/poll.
* Returns the index of the item in the event loop. */
int64_t event_loop_attach(event_loop *loop,
int type,
void *data,
int fd,
int events) {
assert(utarray_len(loop->items) == utarray_len(loop->waiting));
int64_t index = utarray_len(loop->items);
event_loop_item item = {.type = type, .data = data};
utarray_push_back(loop->items, &item);
struct pollfd waiting = {.fd = fd, .events = events};
utarray_push_back(loop->waiting, &waiting);
return index;
}
/* Detach a file descriptor from the event loop.
* This invalidates all other indices into the event loop items, but leaves
* the ids of the event loop items valid. */
void event_loop_detach(event_loop *loop, int64_t index, int shall_close) {
struct pollfd *waiting_item =
(struct pollfd *) utarray_eltptr(loop->waiting, index);
struct pollfd *waiting_back = (struct pollfd *) utarray_back(loop->waiting);
if (shall_close) {
close(waiting_item->fd);
}
*waiting_item = *waiting_back;
utarray_pop_back(loop->waiting);
event_loop_item *items_item =
(event_loop_item *) utarray_eltptr(loop->items, index);
event_loop_item *items_back = (event_loop_item *) utarray_back(loop->items);
*items_item = *items_back;
utarray_pop_back(loop->items);
}
/* Poll the file descriptors associated to this event loop.
* See http://linux.die.net/man/2/poll */
int event_loop_poll(event_loop *loop) {
return poll((struct pollfd *) utarray_front(loop->waiting),
utarray_len(loop->waiting), -1);
}
/* Get the total number of file descriptors participating in the event loop. */
int64_t event_loop_size(event_loop *loop) {
return utarray_len(loop->waiting);
}
/* Get the pollfd structure associated to a file descriptor participating in the
* event loop. */
struct pollfd *event_loop_get(event_loop *loop, int64_t index) {
return (struct pollfd *) utarray_eltptr(loop->waiting, index);
}
/* Set the data connection information for participant in the event loop. */
void event_loop_set_data(event_loop *loop, int64_t index, void *data) {
event_loop_item *item =
(event_loop_item *) utarray_eltptr(loop->items, index);
item->data = data;
}
/* Get the data connection information for participant in the event loop. */
void *event_loop_get_data(event_loop *loop, int64_t index) {
event_loop_item *item =
(event_loop_item *) utarray_eltptr(loop->items, index);
return item->data;
}
/* Free the space associated to the event loop.
* Does not free the event_loop datastructure itself. */
void event_loop_free(event_loop *loop) {
utarray_free(loop->items);
utarray_free(loop->waiting);
}
+38
View File
@@ -0,0 +1,38 @@
#ifndef EVENT_LOOP_H
#define EVENT_LOOP_H
#include <poll.h>
#include <stdint.h>
#include "utarray.h"
typedef struct {
/* The type of connection (e.g. redis, client, manager, data transfer). */
int type;
/* Data associated with the connection (managed by the user) */
void *data;
} event_loop_item;
typedef struct {
/* Array of event_loop_items that hold information for connections. */
UT_array *items;
/* Array of file descriptors that are waiting, corresponding to items. */
UT_array *waiting;
} event_loop;
/* Event loop functions. */
void event_loop_init(event_loop *loop);
void event_loop_free(event_loop *loop);
int64_t event_loop_attach(event_loop *loop,
int type,
void *data,
int fd,
int events);
void event_loop_detach(event_loop *loop, int64_t index, int shall_close);
int event_loop_poll(event_loop *loop);
int64_t event_loop_size(event_loop *loop);
struct pollfd *event_loop_get(event_loop *loop, int64_t index);
void event_loop_set_data(event_loop *loop, int64_t index, void *data);
void *event_loop_get_data(event_loop *loop, int64_t index);
#endif
+29
View File
@@ -0,0 +1,29 @@
#ifndef DB_H
#define DB_H
#include "event_loop.h"
typedef struct db_conn_impl db_conn;
/* Connect to the global system store at address and port. The last
* parameter is an output parameter and we assume the memory is
* allocated by the caller. */
void db_connect(const char *db_address,
int db_port,
const char *client_type,
const char *client_addr,
int client_port,
db_conn *db);
/* Attach global system store onnection to event loop. Returns the index of the
* connection in the loop. */
int64_t db_attach(db_conn *db, event_loop *loop, int connection_type);
/* This function will be called by the user if there is a new event in the
* event loop associated with the global system store connection. */
void db_event(db_conn *db);
/* Disconnect from the global system store. */
void db_disconnect(db_conn *db);
#endif
+15
View File
@@ -0,0 +1,15 @@
#include "common.h"
#include "db.h"
typedef void (*lookup_callback)(void *);
/* Register a new object with the directory. */
void object_table_add(db_conn *db, unique_id object_id);
/* Remove object from the directory */
void object_table_remove(db_conn *db, unique_id object_id);
/* Look up entry from the directory */
void object_table_lookup(db_conn *db,
unique_id object_id,
lookup_callback callback);
+188
View File
@@ -0,0 +1,188 @@
/* Redis implementation of the global state store */
#include <assert.h>
#include "common.h"
#include "db.h"
#include "object_table.h"
#include "event_loop.h"
#include "redis.h"
static void poll_add_read(void *privdata) {
db_conn *conn = (db_conn *) privdata;
if (!conn->reading) {
conn->reading = 1;
event_loop_get(conn->loop, 0)->events |= POLLIN;
}
}
static void poll_del_read(void *privdata) {
db_conn *conn = (db_conn *) privdata;
if (conn->reading) {
conn->reading = 0;
event_loop_get(conn->loop, 0)->events &= ~POLLIN;
}
}
static void poll_add_write(void *privdata) {
db_conn *conn = (db_conn *) privdata;
if (!conn->writing) {
conn->writing = 1;
event_loop_get(conn->loop, 0)->events |= POLLOUT;
}
}
static void poll_del_write(void *privdata) {
db_conn *conn = (db_conn *) privdata;
if (conn->writing) {
conn->writing = 0;
event_loop_get(conn->loop, 0)->events &= ~POLLOUT;
}
}
#define LOG_REDIS_ERR(context, M, ...) \
fprintf(stderr, "[ERROR] (%s:%d: message: %s) " M "\n", __FILE__, __LINE__, \
context->errstr, ##__VA_ARGS__)
#define CHECK_REDIS_CONNECT(CONTEXT_TYPE, context, M, ...) \
do { \
CONTEXT_TYPE *_context = (context); \
if (!_context) { \
LOG_ERR("could not allocate redis context"); \
exit(-1); \
} \
if (_context->err) { \
LOG_REDIS_ERR(_context, M, ##__VA_ARGS__); \
exit(-1); \
} \
} while (0);
void db_connect(const char *address,
int port,
const char *client_type,
const char *client_addr,
int client_port,
db_conn *db) {
/* Sync connection for initial handshake */
redisReply *reply;
long long num_clients;
redisContext *context = redisConnect(address, port);
CHECK_REDIS_CONNECT(redisContext, context, "could not connect to redis %s:%d",
address, port);
/* Add new client using optimistic locking. */
while (1) {
reply = redisCommand(context, "WATCH %s", client_type);
freeReplyObject(reply);
reply = redisCommand(context, "HLEN %s", client_type);
num_clients = reply->integer;
freeReplyObject(reply);
reply = redisCommand(context, "MULTI");
freeReplyObject(reply);
reply = redisCommand(context, "HSET %s %lld %s:%d", client_type,
num_clients, client_addr, client_port);
freeReplyObject(reply);
reply = redisCommand(context, "EXEC");
if (reply) {
freeReplyObject(reply);
break;
}
freeReplyObject(reply);
}
redisFree(context);
db->client_type = strdup(client_type);
db->client_id = num_clients;
db->reading = 0;
db->writing = 0;
/* Establish async connection */
db->context = redisAsyncConnect(address, port);
CHECK_REDIS_CONNECT(redisAsyncContext, db->context,
"could not connect to redis %s:%d", address, port);
db->context->data = (void *) db;
}
void db_event(db_conn *db) {
if (db->reading) {
redisAsyncHandleRead(db->context);
}
if (db->writing) {
redisAsyncHandleWrite(db->context);
}
}
int64_t db_attach(db_conn *db, event_loop *loop, int connection_type) {
db->loop = loop;
redisAsyncContext *ac = db->context;
redisContext *c = &(ac->c);
if (ac->ev.data != NULL) {
return REDIS_ERR;
}
ac->ev.addRead = poll_add_read;
ac->ev.delRead = poll_del_read;
ac->ev.addWrite = poll_add_write;
ac->ev.delWrite = poll_del_write;
// TODO(pcm): Implement cleanup function
ac->ev.data = db;
return event_loop_attach(loop, connection_type, NULL, c->fd,
POLLIN | POLLOUT);
}
void object_table_add(db_conn *db, unique_id object_id) {
static char hex_object_id[2 * UNIQUE_ID_SIZE + 1];
sha1_to_hex(&object_id.id[0], &hex_object_id[0]);
redisAsyncCommand(db->context, NULL, NULL, "SADD obj:%s %d",
&hex_object_id[0], 0);
if (db->context->err) {
LOG_REDIS_ERR(db->context, "could not add object_table entry");
}
}
void object_table_lookup_callback(redisAsyncContext *c,
void *r,
void *privdata) {
redisReply *reply = r;
if (reply == NULL)
return;
lookup_callback callback = privdata;
char *str = malloc(reply->len);
memcpy(str, reply->str, reply->len);
callback(str);
}
void object_table_fetch_addr_port(redisAsyncContext *c,
void *r,
void *privdata) {
redisReply *reply = r;
if (reply == NULL)
return;
long long manager_id = -1;
if (reply->type == REDIS_REPLY_STRING) {
manager_id = strtoll(reply->str, NULL, 10);
} else if (reply->type != REDIS_REPLY_INTEGER) {
manager_id = reply->integer;
} else {
LOG_ERR("expected integer or string, received type %d", reply->type);
exit(-1);
}
db_conn *db = c->data;
redisAsyncCommand(db->context, object_table_lookup_callback, privdata,
"HGET %s %lld", db->client_type, manager_id);
}
void object_table_lookup(db_conn *db,
unique_id object_id,
lookup_callback callback) {
static char hex_object_id[2 * UNIQUE_ID_SIZE + 1];
sha1_to_hex(&object_id.id[0], &hex_object_id[0]);
redisAsyncCommand(db->context, object_table_fetch_addr_port, callback,
"SRANDMEMBER obj:%s", &hex_object_id[0]);
if (db->context->err) {
LOG_REDIS_ERR(db->context, "error in object_table lookup");
}
}
+26
View File
@@ -0,0 +1,26 @@
#include "db.h"
#include "object_table.h"
#include "hiredis/hiredis.h"
#include "hiredis/async.h"
struct db_conn_impl {
/* String that identifies this client type. */
char *client_type;
/* Unique ID for this client within the type. */
int64_t client_id;
/* Redis context for this global state store connection. */
redisAsyncContext *context;
/* Which events are we processing (read, write)? */
int reading, writing;
/* The event loop this global state store connection is part of. */
event_loop *loop;
};
void object_table_fetch_addr_port(redisAsyncContext *c,
void *r,
void *privdata);
void object_table_lookup_callback(redisAsyncContext *c,
void *r,
void *privdata);
+69
View File
@@ -0,0 +1,69 @@
#include "greatest.h"
#include <assert.h>
#include "event_loop.h"
#include "state/db.h"
#include "state/object_table.h"
#include "state/redis.h"
SUITE(db_tests);
int lookup_successful = 0;
const char *manager_addr = "127.0.0.1";
int manager_port = 12345;
char received_addr[16] = {0};
char received_port[6] = {0};
void test_callback(void *userdata);
void test_callback(void *userdata) {
char *reply = userdata;
lookup_successful = 1;
if (!reply ||
sscanf(reply, "%15[0-9.]:%5[0-9]", received_addr, received_port) != 2) {
assert(0);
}
free(reply);
}
TEST object_table_lookup_test(void) {
event_loop loop;
event_loop_init(&loop);
db_conn conn;
db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, manager_port,
&conn);
int64_t index = db_attach(&conn, &loop, 0);
unique_id id = {{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}};
object_table_add(&conn, id);
object_table_lookup(&conn, id, test_callback);
while (!lookup_successful) {
int num_ready = event_loop_poll(&loop);
if (num_ready < 0) {
exit(-1);
}
for (int i = 0; i < event_loop_size(&loop); ++i) {
struct pollfd *waiting = event_loop_get(&loop, i);
if (waiting->revents == 0)
continue;
if (i == index) {
db_event(&conn);
}
}
}
ASSERT_STR_EQ(&received_addr[0], manager_addr);
ASSERT_EQ(atoi(received_port), manager_port);
PASS();
}
SUITE(db_tests) {
RUN_TEST(object_table_lookup_test);
}
GREATEST_MAIN_DEFS();
int main(int argc, char **argv) {
GREATEST_MAIN_BEGIN();
RUN_SUITE(db_tests);
GREATEST_MAIN_END();
}
+4
View File
@@ -0,0 +1,4 @@
wget http://download.redis.io/releases/redis-3.2.3.tar.gz
tar xvfz redis-3.2.3.tar.gz
cd redis-3.2.3
make
+1023
View File
File diff suppressed because it is too large Load Diff
Vendored Submodule
+1
Submodule thirdparty/hiredis added at 5f98e1d35d
+238
View File
@@ -0,0 +1,238 @@
/*
Copyright (c) 2008-2016, Troy D. Hanson http://troydhanson.github.com/uthash/
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/* a dynamic array implementation using macros
*/
#ifndef UTARRAY_H
#define UTARRAY_H
#define UTARRAY_VERSION 2.0.1
#ifdef __GNUC__
#define _UNUSED_ __attribute__ ((__unused__))
#else
#define _UNUSED_
#endif
#include <stddef.h> /* size_t */
#include <string.h> /* memset, etc */
#include <stdlib.h> /* exit */
#ifndef oom
#define oom() exit(-1)
#endif
typedef void (ctor_f)(void *dst, const void *src);
typedef void (dtor_f)(void *elt);
typedef void (init_f)(void *elt);
typedef struct {
size_t sz;
init_f *init;
ctor_f *copy;
dtor_f *dtor;
} UT_icd;
typedef struct {
unsigned i,n;/* i: index of next available slot, n: num slots */
UT_icd icd; /* initializer, copy and destructor functions */
char *d; /* n slots of size icd->sz*/
} UT_array;
#define utarray_init(a,_icd) do { \
memset(a,0,sizeof(UT_array)); \
(a)->icd = *(_icd); \
} while(0)
#define utarray_done(a) do { \
if ((a)->n) { \
if ((a)->icd.dtor) { \
unsigned _ut_i; \
for(_ut_i=0; _ut_i < (a)->i; _ut_i++) { \
(a)->icd.dtor(utarray_eltptr(a,_ut_i)); \
} \
} \
free((a)->d); \
} \
(a)->n=0; \
} while(0)
#define utarray_new(a,_icd) do { \
(a) = (UT_array*)malloc(sizeof(UT_array)); \
if ((a) == NULL) oom(); \
utarray_init(a,_icd); \
} while(0)
#define utarray_free(a) do { \
utarray_done(a); \
free(a); \
} while(0)
#define utarray_reserve(a,by) do { \
if (((a)->i+(by)) > (a)->n) { \
char *utarray_tmp; \
while (((a)->i+(by)) > (a)->n) { (a)->n = ((a)->n ? (2*(a)->n) : 8); } \
utarray_tmp=(char*)realloc((a)->d, (a)->n*(a)->icd.sz); \
if (utarray_tmp == NULL) oom(); \
(a)->d=utarray_tmp; \
} \
} while(0)
#define utarray_push_back(a,p) do { \
utarray_reserve(a,1); \
if ((a)->icd.copy) { (a)->icd.copy( _utarray_eltptr(a,(a)->i++), p); } \
else { memcpy(_utarray_eltptr(a,(a)->i++), p, (a)->icd.sz); }; \
} while(0)
#define utarray_pop_back(a) do { \
if ((a)->icd.dtor) { (a)->icd.dtor( _utarray_eltptr(a,--((a)->i))); } \
else { (a)->i--; } \
} while(0)
#define utarray_extend_back(a) do { \
utarray_reserve(a,1); \
if ((a)->icd.init) { (a)->icd.init(_utarray_eltptr(a,(a)->i)); } \
else { memset(_utarray_eltptr(a,(a)->i),0,(a)->icd.sz); } \
(a)->i++; \
} while(0)
#define utarray_len(a) ((a)->i)
#define utarray_eltptr(a,j) (((j) < (a)->i) ? _utarray_eltptr(a,j) : NULL)
#define _utarray_eltptr(a,j) ((a)->d + ((a)->icd.sz * (j)))
#define utarray_insert(a,p,j) do { \
if ((j) > (a)->i) utarray_resize(a,j); \
utarray_reserve(a,1); \
if ((j) < (a)->i) { \
memmove( _utarray_eltptr(a,(j)+1), _utarray_eltptr(a,j), \
((a)->i - (j))*((a)->icd.sz)); \
} \
if ((a)->icd.copy) { (a)->icd.copy( _utarray_eltptr(a,j), p); } \
else { memcpy(_utarray_eltptr(a,j), p, (a)->icd.sz); }; \
(a)->i++; \
} while(0)
#define utarray_inserta(a,w,j) do { \
if (utarray_len(w) == 0) break; \
if ((j) > (a)->i) utarray_resize(a,j); \
utarray_reserve(a,utarray_len(w)); \
if ((j) < (a)->i) { \
memmove(_utarray_eltptr(a,(j)+utarray_len(w)), \
_utarray_eltptr(a,j), \
((a)->i - (j))*((a)->icd.sz)); \
} \
if ((a)->icd.copy) { \
unsigned _ut_i; \
for(_ut_i=0;_ut_i<(w)->i;_ut_i++) { \
(a)->icd.copy(_utarray_eltptr(a, (j) + _ut_i), _utarray_eltptr(w, _ut_i)); \
} \
} else { \
memcpy(_utarray_eltptr(a,j), _utarray_eltptr(w,0), \
utarray_len(w)*((a)->icd.sz)); \
} \
(a)->i += utarray_len(w); \
} while(0)
#define utarray_resize(dst,num) do { \
unsigned _ut_i; \
if ((dst)->i > (unsigned)(num)) { \
if ((dst)->icd.dtor) { \
for (_ut_i = (num); _ut_i < (dst)->i; ++_ut_i) { \
(dst)->icd.dtor(_utarray_eltptr(dst, _ut_i)); \
} \
} \
} else if ((dst)->i < (unsigned)(num)) { \
utarray_reserve(dst, (num) - (dst)->i); \
if ((dst)->icd.init) { \
for (_ut_i = (dst)->i; _ut_i < (unsigned)(num); ++_ut_i) { \
(dst)->icd.init(_utarray_eltptr(dst, _ut_i)); \
} \
} else { \
memset(_utarray_eltptr(dst, (dst)->i), 0, (dst)->icd.sz*((num) - (dst)->i)); \
} \
} \
(dst)->i = (num); \
} while(0)
#define utarray_concat(dst,src) do { \
utarray_inserta(dst, src, utarray_len(dst)); \
} while(0)
#define utarray_erase(a,pos,len) do { \
if ((a)->icd.dtor) { \
unsigned _ut_i; \
for (_ut_i = 0; _ut_i < (len); _ut_i++) { \
(a)->icd.dtor(utarray_eltptr(a, (pos) + _ut_i)); \
} \
} \
if ((a)->i > ((pos) + (len))) { \
memmove(_utarray_eltptr(a, pos), _utarray_eltptr(a, (pos) + (len)), \
((a)->i - ((pos) + (len))) * (a)->icd.sz); \
} \
(a)->i -= (len); \
} while(0)
#define utarray_renew(a,u) do { \
if (a) utarray_clear(a); \
else utarray_new(a, u); \
} while(0)
#define utarray_clear(a) do { \
if ((a)->i > 0) { \
if ((a)->icd.dtor) { \
unsigned _ut_i; \
for(_ut_i=0; _ut_i < (a)->i; _ut_i++) { \
(a)->icd.dtor(_utarray_eltptr(a, _ut_i)); \
} \
} \
(a)->i = 0; \
} \
} while(0)
#define utarray_sort(a,cmp) do { \
qsort((a)->d, (a)->i, (a)->icd.sz, cmp); \
} while(0)
#define utarray_find(a,v,cmp) bsearch((v),(a)->d,(a)->i,(a)->icd.sz,cmp)
#define utarray_front(a) (((a)->i) ? (_utarray_eltptr(a,0)) : NULL)
#define utarray_next(a,e) (((e)==NULL) ? utarray_front(a) : ((((a)->i) > (utarray_eltidx(a,e)+1)) ? _utarray_eltptr(a,utarray_eltidx(a,e)+1) : NULL))
#define utarray_prev(a,e) (((e)==NULL) ? utarray_back(a) : ((utarray_eltidx(a,e) > 0) ? _utarray_eltptr(a,utarray_eltidx(a,e)-1) : NULL))
#define utarray_back(a) (((a)->i) ? (_utarray_eltptr(a,(a)->i-1)) : NULL)
#define utarray_eltidx(a,e) (((char*)(e) >= (a)->d) ? (((char*)(e) - (a)->d)/(a)->icd.sz) : -1)
/* last we pre-define a few icd for common utarrays of ints and strings */
static void utarray_str_cpy(void *dst, const void *src) {
char **_src = (char**)src, **_dst = (char**)dst;
*_dst = (*_src == NULL) ? NULL : strdup(*_src);
}
static void utarray_str_dtor(void *elt) {
char **eltc = (char**)elt;
if (*eltc != NULL) free(*eltc);
}
static const UT_icd ut_str_icd _UNUSED_ = {sizeof(char*),NULL,utarray_str_cpy,utarray_str_dtor};
static const UT_icd ut_int_icd _UNUSED_ = {sizeof(int),NULL,NULL,NULL};
static const UT_icd ut_ptr_icd _UNUSED_ = {sizeof(void*),NULL,NULL,NULL};
#endif /* UTARRAY_H */