Made more submodules.

This commit is contained in:
Stephen Diehl
2012-05-10 15:46:19 -04:00
parent 62ec591d90
commit 00de461da8
11 changed files with 45 additions and 870 deletions
+35
View File
@@ -0,0 +1,35 @@
[loggers]
keys=root,simpleExample
[handlers]
keys=consoleHandler,filesystemHandler
[formatters]
keys=ziplineformat
# -------
[logger_root]
level=DEBUG
handlers=consoleHandler,filesystemHandler
qualname=ZiplineLogger
# -------
[handler_filesystemHandler]
class=RotatingFileHandler
level=DEBUG
formatter=ziplineformat
args=("/var/log/zipline/zipline.log",10*1024*1024,5)
[handler_consoleHandler]
class=StreamHandler
level=ERROR
formatter=ziplineformat
args=(sys.stdout,)
# -------
[formatter_ziplineformat]
format=%(asctime)s %(levelname)s %(filename)s %(funcName)s - %(message)s
datefmt=%Y-%m-%d %H:%M:%S %Z
View File
+2 -2
View File
@@ -7,12 +7,12 @@ import gevent_zeromq
from collections import OrderedDict
from protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \
from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \
CONTROL_UNFRAME, CONTROL_STATES, INVALID_CONTROL_FRAME \
states = CONTROL_STATES
from gpoll import _Poller as GeventPoller
from zipline.utils.gpoll import _Poller as GeventPoller
# Roll Call ( Discovery )
# -----------------------
+2
View File
@@ -36,6 +36,7 @@ Risk Report
"""
import logging
import datetime
import math
import pytz
@@ -44,6 +45,7 @@ import numpy.linalg as la
import zipline.util as qutil
import zipline.protocol as zp
LOGGER = logging.getLogger('ZiplineLogger')
def advance_by_months(dt, jump_in_months):
month = dt.month + jump_in_months
+1 -1
View File
@@ -68,7 +68,7 @@ from collections import defaultdict
from nose.tools import timed
import zipline.test.factory as factory
import zipline.utils.factory as factory
import zipline.util as qutil
import zipline.finance.risk as risk
import zipline.protocol as zp
-621
View File
@@ -1,621 +0,0 @@
import time
import gevent
import itertools
# pyzmq
import zmq
import gevent_zeromq
from collections import OrderedDict
from protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \
CONTROL_UNFRAME, CONTROL_STATES, INVALID_CONTROL_FRAME \
states = CONTROL_STATES
from gpoll import _Poller as GeventPoller
# Roll Call ( Discovery )
# -----------------------
#
# Controller ( 'foo', 'bar', 'fizz', 'pop' )
# ------------------
# | | | |
# +---+
# | 0 | ? ? ?
# +---+
# |
# IDENTITY: foo
# get message: PROTOCOL.HEARTBEAT
# reply with PROTOCOL.OK
#
# Controller topology = ( 'foo', 'bar', 'fizz', 'pop' )
# 'foo' in topology = YES ->
# track 'foo'
# ------------------
# | | | |
# +---+
# | 1 | ? ? ?
# +---+
# Heartbeating
# ------------
#
# Controller ( time = 2.717828 )
# ------------------
# | | | |
# +---+ +---+ +---+ +---+
# | 0 | | 0 | | 0 | | 0 |
# +---+ +---+ +---+ +---+
# |
# IDENTITY: foo
# get message: time = 2.717828
# reply with [ foo, 2.71828 ]
#
# Controller ( foo.status = OK )
# ------------------
# | | | |
# +---+ +---+ +---+ +---+
# | 1 | | 0 | | 0 | | 0 |
# +---+ +---+ +---+ +---+
# |
# Controller tracks this node as good
# for this heartbeat
# Shutdown
# --------
#
# Controller ( state = RUNNING )
# ------------------
# | | | |
# +---+ +---+ +---+ +---+
# | 1 | | 1 | | 1 | | 1 |
# +---+ +---+ +---+ +---+
# |
# IDENTITY: foo
# send [ DONE ]
# Controller ( state = SHUTDOWN )
# Controller topology.remove('foo')
# ------------------
# | | |
# +---+ +---+ +---+ +---+
# | | | 1 | | 1 | | 1 |
# +---+ +---+ +---+ +---+
# |
# IDENTITY: foo
# yield, stop sending messages
# Termination
# ------------
#
# Controller ( state = TERMINATE )
# ------------------
# | | | |
# +---+ +---+ +---+ +---+
# | 1 | | 1 | | 1 | | 1 |
# +---+ +---+ +---+ +---+
# |
# get message PROTOCOL.KILL
# Controller ( state = TERMINATE )
# ------------------
# | | | |
# +---+ +---+ +---+ +---+
# | 0 | | 0 | | 0 | | 0 |
# +---+ +---+ +---+ +---+
INIT, SOURCES_READY, RUNNING, TERMINATE = CONTROL_STATES
state_transitions = frozenset([
(-1 , INIT),
(INIT , SOURCES_READY),
(SOURCES_READY , RUNNING),
(INIT , TERMINATE),
(SOURCES_READY , TERMINATE),
(RUNNING , TERMINATE),
])
class UnknownChatter(Exception):
def __init__(self, name):
self.named = name
def __str__(self):
return """Component calling itself "%s" talking on unexpected channel"""\
% self.named
class Controller(object):
"""
A N to M messaging system for inter component communication.
:param pub_socket: Socket to publish messages, the starting
point of :func message_listener: .
:param route_socket: Socket to listen for status updates for
the individual components.
:func message_sender: .
:param logging: Logging interface for tracking broker state
Defaults to None
Topology is the set of components we expect to show up.
States are the transitions the sytems go through. The
simplest is from RUNNING -> NOT RUNNING .
Usage::
controller = Controller(
'tcp://127.0.0.1:5000',
'tcp://127.0.0.1:5001',
)
# typically you'd want to run this async to your main
# program since it blocks indefinetely.
controller.manage(
[ TOPOLOGY ]
[ STATES ]
)
"""
debug = False
period = 1
def __init__(self, pub_socket, route_socket, logging = None):
self.context = None
self.zmq = None
self.zmq_poller = None
self.running = False
self.polling = False
self.tracked = set()
self.responses = set()
self.ctime = 0
self.tic = time.time()
self.freeform = False
self._state = -1
self.associated = []
self.pub_socket = pub_socket
self.route_socket = route_socket
self.error_replay = OrderedDict()
if logging:
self.logging = logging
else:
import util as qutil
self.logging = qutil.LOGGER
def init_zmq(self, flavor):
assert self.zmq_flavor in ['thread', 'mp', 'green']
if flavor == 'mp':
self.zmq = zmq
self.context = self.zmq.Context()
self.zmq_poller = self.zmq.Poller
return
if flavor == 'thread':
self.zmq = zmq
self.context = self.zmq.Context.instance()
self.zmq_poller = self.zmq.Poller
return
if flavor == 'green':
self.zmq = gevent_zeromq.zmq
self.context = self.zmq.Context.instance()
self.zmq_poller = GeventPoller
return
if flavor == 'pypy':
self.zmq = zmq
self.context = self.zmq.Context.instance()
self.zmq_poller = self.zmq.Poller
return
def manage(self, topology, states=None, context=None):
"""
Give the controller a set set of components to manage and
a set of state transitions for the entire system.
"""
# A freeform topology is where we heartbeat with anything
# that shows up.
if topology == 'freeform':
self.freeform = True
self.topology = frozenset([])
else:
self.freeform = False
self.topology = frozenset(topology)
self.polling = True
self.state = CONTROL_STATES.INIT
@property
def state(self):
return self._state
@state.setter
def state(self, new):
old, self._state = self._state, new
if (old, new) not in state_transitions:
raise RuntimeError("[Controller] Invalid State Transition : %s -> %s" %(old, new))
else:
self.logging.info("[Controller] State Transition : %s -> %s" %(old, new))
def run(self):
self.running = True
self.init_zmq(self.zmq_flavor)
try:
return self._poll() # use a python loop
except KeyboardInterrupt:
self.logging.info('Shutdown event loop')
def log_status(self):
"""
Snapshot of the tracked components at every period.
"""
#self.logging.info("[Controller] Tracking : %s" % ([c for c in self.tracked],))
pass
def replay_errors(self):
"""
Replay the errors in the order they were reported to the
controller.
"""
return [ a for a in sorted(self.replay_errors.keys())]
# -------------
# Publications
# -------------
def send_heart(self):
if not self.running:
return
heartbeat_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.HEARTBEAT,
str(self.ctime)
)
self.pub.send(heartbeat_frame)
def send_hardkill(self):
if not self.running:
return
kill_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.KILL,
''
)
self.pub.send(kill_frame)
def send_softkill(self):
if not self.running:
return
soft_frame = CONTROL_FRAME(
CONTROL_PROTOCOL.SHUTDOWN,
''
)
self.pub.send(soft_frame)
# -----------
# Event Loops
# -----------
def _poll(self):
assert self.route_socket
assert self.pub_socket
assert self.cancel_socket
# -- Publish --
# =============
self.pub = self.context.socket(self.zmq.PUB)
self.pub.bind(self.pub_socket)
# -- Cancel --
# =============
assert isinstance(self.cancel_socket,basestring), self.cancel_socket
self.cancel = self.context.socket(self.zmq.REP)
self.cancel.connect(self.cancel_socket)
# -- Router --
# =============
self.router = self.context.socket(self.zmq.ROUTER)
self.router.bind(self.route_socket)
poller = self.zmq.Poller()
poller.register(self.router, self.zmq.POLLIN)
poller.register(self.cancel, self.zmq.POLLIN)
self.associated += [self.pub, self.router, self.cancel]
# TODO: actually do this
self.state = CONTROL_STATES.SOURCES_READY
buffer = []
for i in itertools.count(0):
self.log_status()
self.responses = set()
self.ctime = time.time()
self.send_heart()
while self.polling:
# Reset the responses for this cycle
socks = dict(poller.poll(self.period))
tic = time.time()
if tic - self.ctime > self.period:
break
if self.router in socks and socks[self.router] == self.zmq.POLLIN:
rawmessage = self.router.recv()
if rawmessage:
buffer.append(rawmessage)
try:
if not self.router.getsockopt(self.zmq.RCVMORE):
self.handle_recv(buffer[:])
buffer = []
except INVALID_CONTROL_FRAME:
self.logging.error('Invalid frame', rawmessage)
pass
if self.cancel in socks and socks[self.cancel] == self.zmq.POLLIN:
self.logging.info('[Controller] Received Cancellation')
rawmessage = self.cancel.recv()
self.shutdown(soft=True)
break
self.beat()
if self.zmq_flavor == 'green':
gevent.sleep(0)
if self.state is CONTROL_STATES.TERMINATE:
break
if not self.polling:
break
# After loop exits
self.terminated = True
def beat(self):
# These the set overloaded operations
# A & B ~ set.intersection
# A - B ~ set.difference
# * good - Components we are currently tracking and who just sent
# us back the right response.
# * bad - Components we are currently tracking but who did not
# send us back a response.
# * new - Components we haven't heard from yet, but sent back the
# right response.
good = self.tracked & self.responses
bad = self.tracked - good
new = self.responses - good
for component in new:
self.new(component)
for component in bad:
self.fail(component)
# --------------
# Init Handlers
# --------------
def new_source(self):
if self.state is CONTROL_STATES.RUNNING:
self.state = SOURCES_READY
def new_universal(self):
pass
# The various "states of being that a component can inform us
# of
def new(self, component):
if self.state is CONTROL_STATES.TERMINATE:
return
self.logging.info('[Controller] Now Tracking "%s" ' % component)
universal = self.new_universal
init_handlers = {
'FEED' : self.new_source,
}
if component in self.topology or self.freeform:
init_handlers.get(component, universal)()
self.tracked.add(component)
else:
# Some sort of socket collision has occured, this is
# a very bad failure mode.
raise UnknownChatter(component)
# ------------------
# Epic Fail Handling
# ------------------
def fail_universal(self):
pass
# TODO: this requires higher order functionality
#self.logging.error('[Controller] System in exception state, shutting down')
#self.shutdown(soft=True)
def fail(self, component):
if self.state is CONTROL_STATES.TERMINATE:
return
universal = self.fail_universal
fail_handlers = { }
if component in self.topology or self.freeform:
self.logging.info('[Controller] Component "%s" timed out' % component)
self.tracked.remove(component)
fail_handlers.get(component, universal)()
# -------------------
# Completion Handling
# -------------------
def done(self, component):
self.logging.info('[Controller] Component "%s" done.' % component)
# --------------
# Error Handling
# --------------
def exception_universal(self):
"""
Shutdown the system on failure.
"""
self.logging.error('[Controller] System in exception state, shutting down')
self.shutdown(soft=True)
def exception(self, component, failure):
universal = self.exception_universal
exception_handlers = { }
if component in self.topology or self.freeform:
self.error_replay[(component, time.time())] = failure
self.logging.error('[Controller] Component "%s" in exception state' % component)
exception_handlers.get(component, universal)()
else:
raise UnknownChatter(component)
# -----------------
# Protocol Handling
# -----------------
def handle_recv(self, msg):
"""
Check for proper framing at the transport layer.
Seperates the proper frames from anything else that might
be coming over the wire. Which shouldn't happen ... right?
"""
identity = msg[0]
id, status = CONTROL_UNFRAME(msg[1])
# A component is telling us its alive:
if id is CONTROL_PROTOCOL.OK:
if status == str(self.ctime):
self.responses.add(identity)
else:
# Otherwise its something weird and we don't know
# what to do so just say so
self.logging.error("Weird stuff happened: %s" % msg)
# A component is telling us it failed, and how
if id is CONTROL_PROTOCOL.EXCEPTION:
self.exception(identity, status)
# A component is telling us its done with work and won't
# be talking to us anymore
if id is CONTROL_PROTOCOL.DONE:
self.done(identity)
# -------------------
# Hooks for Endpoints
# -------------------
# These are all connects so no complex allocation logic is
# needed. Dealers and Subscribers can all come and go as a
# function of time without impacting flow of the whole
# system.
def message_sender(self, identity, context = None):
"""
Spin off a socket used for sending messages to this
controller.
"""
if not context:
context = self.zmq.Context.instance()
s = context.socket(zmq.DEALER)
s.setsockopt(zmq.IDENTITY, identity)
s.connect(self.route_socket)
self.associated.append(s)
return s
def message_listener(self, context = None):
"""
Spin off a socket used for receiving messages from this
controller.
"""
if not context:
context = self.zmq.Context.instance()
s = context.socket(zmq.SUB)
s.connect(self.pub_socket)
s.setsockopt(zmq.SUBSCRIBE, '')
self.associated.append(s)
return s
def do_error_replay(self):
for (component, time), error in self.error_replay.iteritems():
self.logging.info('[Controller] Error Log for -- %s --:\n%s' %
(component, error))
def shutdown(self, hard=False, soft=True, context=None):
if not self.polling:
return
self.polling = False
assert hard or soft, """ Must specify kill hard or soft """
if hard:
self.state = CONTROL_STATES.TERMINATE
self.logging.info('[Controller] Hard Shutdown')
#for asoc in self.associated:
#asoc.close()
if soft:
self.state = CONTROL_STATES.TERMINATE
self.logging.info('[Controller] Soft Shutdown')
self.send_softkill()
#for asoc in self.associated:
#asoc.close()
self.do_error_replay()
if __name__ == '__main__':
print 'Running on '\
'tcp://127.0.0.1:5000 '\
'tcp://127.0.0.1:5001 '
controller = Controller(
'tcp://127.0.0.1:5000',
'tcp://127.0.0.1:5001',
)
controller.zmq_flavor = 'green'
controller.manage(
'freeform',
[]
)
controller.run()
+2 -2
View File
@@ -123,8 +123,8 @@ import time
import copy
from collections import namedtuple
from protocol_utils import Enum, FrameExceptionFactory, namedict
from date_utils import EPOCH, UN_EPOCH
from utils.protocol_utils import Enum, FrameExceptionFactory, namedict
from utils.date_utils import EPOCH, UN_EPOCH
# -----------------------
# Control Protocol
-221
View File
@@ -1,221 +0,0 @@
import copy
import pandas
from ctypes import Structure, c_ubyte
from collections import MutableMapping
from itertools import izip
def Enum(*options):
"""
Fast enums are very important when we want really tight zmq
loops. These are probably going to evolve into pure C structs
anyways so might as well get going on that.
"""
class cstruct(Structure):
_fields_ = [(o, c_ubyte) for o in options]
__iter__ = lambda s: iter(range(len(options)))
return cstruct(*range(len(options)))
def FrameExceptionFactory(name):
"""
Exception factory with a closure around the frame class name.
"""
class InvalidFrame(Exception):
def __init__(self, got):
self.got = got
def __str__(self):
return "Invalid {framecls} Frame: {got}".format(
framecls = name,
got = self.got,
)
return InvalidFrame
class namedict(MutableMapping):
"""
Namedicts are dict like objects that have fields accessible by attribute lookup
as well as being indexable and iterable::
HEARTBEAT_PROTOCOL = namedict({
'REQ' : b'\x01',
'REP' : b'\x02',
})
HEARTBEAT_PROTOCOL.REQ # syntactic sugar
HEARTBEAT_PROTOCOL.REP # oh suga suga
For more complex structs use collections.namedtuple:
"""
def __init__(self, dct=None):
if(dct):
self.__dict__.update(dct)
def __setitem__(self, key, value):
"""
Required for use by pymongo as_class parameter to find.
"""
if(key == '_id'):
self.__dict__['id'] = value
else:
self.__dict__[key] = value
def __getitem__(self, key):
return self.__dict__[key]
def __delitem__(self, key):
del self.__dict__[key]
def __iter__(self):
return self.__dict__.iterkeys()
def __len__(self):
return len(self.__dict__)
def keys(self):
return self.__dict__.keys()
def as_dict(self):
# shallow copy is O(n)
return copy.copy(self.__dict__)
def delete(self, key):
del(self.__dict__[key])
def merge(self, other_nd):
assert isinstance(other_nd, namedict)
self.__dict__.update(other_nd.__dict__)
def __repr__(self):
return "namedict: " + str(self.__dict__)
def __eq__(self, other):
# !!!!!!!!!!!!!!!!!!!!
# !!!! DANGEROUS !!!!!
# !!!!!!!!!!!!!!!!!!!!
return other != None and self.__dict__ == other.__dict__
def has_attr(self, name):
return self.__dict__.has_key(name)
def as_series(self):
s = pandas.Series(self.__dict__)
s.name = self.sid
return s
class ndict(MutableMapping):
"""
Xtreme Namedicts 2.0
Ndicts are dict like objects that have fields accessible by attribute
lookup as well as being indexable and iterable. Done right
this time.
"""
def __init__(self, dct=None):
self.__internal = dict()
self.cls = frozenset(dir(self))
if dct:
self.__internal.update(dct)
# Abstact Overloads
# -----------------
def __setitem__(self, key, value):
"""
Required for use by pymongo as_class parameter to find.
"""
if key == '_id':
self.__internal['id'] = value
else:
self.__internal[key] = value
def __getattr__(self, key):
if key in self.cls:
return self.__dict__[key]
else:
return self.__internal[key]
def __getitem__(self, key):
return self.__internal[key]
def __delitem__(self, key):
del self.__internal[key]
def __iter__(self):
return self.__internal.iterkeys()
def __len__(self):
return len(self.__internal)
# Compatability with namedicts
# ----------------------------
# for compat, not the Python way to do things though...
# Deprecated, use builtin ``del`` operator.
delete = __delitem__
def has_attr(self, key):
"""
Deprecated, use builtin ``in`` operator.
"""
return self.__contains__(key)
def has_key(self, key):
return self.__contains__(key)
# Custom Methods
# --------------
def copy(self):
return ndict(copy.copy(self.__internal))
def as_dataframe(self):
"""
Return the representation as a Pandas dataframe.
"""
d = pandas.DataFrame(self.__internal)
return d
def as_series(self):
"""
Return the representation as a Pandas time series.
"""
s = pandas.Series(self.__internal)
s.name = self.sid
return s
def as_dict(self):
"""
Return the representation as a vanilla Python dict.
"""
# shallow copy is O(n)
return copy.copy(self.__internal)
def merge(self, other_nd):
"""
Merge in place with another ndict.
"""
assert isinstance(other_nd, ndict)
self.__internal.update(other_nd.__internal)
def __repr__(self):
return "namedict: " + str(self.__internal)
# Faster dictionary comparison?
#def __eq__(self, other):
#assert isinstance(other, ndict)
#keyeq = set(self.keys()) == set(other.keys())
#if not keyeq:
#return False
#for i, j in izip(self.itervalues(), other.itervalues()):
#if i != j:
#return False
#return True
View File
@@ -6,7 +6,6 @@ import msgpack
import random
from datetime import datetime, timedelta
import zipline.util as qutil
import zipline.finance.risk as risk
import zipline.protocol as zp
from zipline.sources import SpecificEquityTrades, RandomEquityTrades
@@ -228,4 +227,4 @@ def create_trade_source(sids, trade_count, trade_time_increment, trading_environ
source = SpecificEquityTrades("flat", trade_history)
return source
+2 -21
View File
@@ -3,26 +3,7 @@ Small classes to assist with timezone calculations, LOGGER configuration,
and other common operations.
"""
import datetime
import pytz
import logging
import logging.handlers
import logging.config
LOGGER = logging.getLogger('ZiplineLogger')
def configure_logging(loglevel=logging.DEBUG):
"""
Configures zipline.util.LOGGER to write a rotating file
(10M per file, 5 files) to `` /var/log/zipline.log ``.
"""
LOGGER.setLevel(loglevel)
handler = logging.handlers.RotatingFileHandler(
"/var/log/zipline/{lfn}.log".format(lfn="zipline"),
maxBytes=10*1024*1024, backupCount=5
)
handler.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)s %(filename)s %(funcName)s - %(message)s",
"%Y-%m-%d %H:%M:%S %Z")
)
LOGGER.addHandler(handler)
LOGGER.info("logging started...")
logging.config.fileConfig('logging.conf')