Merge pull request #80 from quantopian/fawce_aug10

bugfixes
This commit is contained in:
fawce
2012-07-23 15:18:17 -07:00
6 changed files with 136 additions and 42 deletions
+1 -1
View File
@@ -150,7 +150,7 @@ class FinanceTestCase(TestCase):
# TODO: for some reason the orders aren't filled without an extra
# trade.
trade_count = 5
trade_count = 5000
self.zipline_test_config['order_count'] = trade_count - 1
self.zipline_test_config['trade_count'] = trade_count
self.zipline_test_config['order_amount'] = 1
-1
View File
@@ -4,7 +4,6 @@ from unittest2 import TestCase, skip
from zipline.core.monitor import Controller
class TestMonitor(TestCase):
def setUp(self):
self.log_handler = LoggingHandler()
+41 -35
View File
@@ -44,44 +44,44 @@ class TradeSimulationClient(Component):
return str(zp.FINANCE_COMPONENT.TRADING_CLIENT)
def set_algorithm(self, algorithm):
"""
:param algorithm: must implement the algorithm protocol. See
:py:mod:`zipline.test.algorithm`
"""
self.algorithm = algorithm
# register the trading_client's order method with the algorithm
# register the client's order method with the algorithm
self.algorithm.set_order(self.order)
# we need to provide the performance tracker with the
# sids referenced in the algorithm, so portfolio can
# initialize with all possible sids.
self.perf.set_sids(self.algorithm.get_sid_filter())
#TODO: re-enable initialization logging. This means we can't call set_algorithm
#until we have a context for this component. Possibly this could happen
# ask the algorithm to initialize, routing stdout to a zmq PUSH socket.
#with self.zmq_out.threadbound(), self.stdout_capture(self.logger, 'Algo print capture'):
# self.algorithm.initialize()
#if we don't have a log socket, initialize anyway.
#else:
# self.algorithm.initialize()
self.algorithm.initialize()
# N.B. Initialize is now called from open, because we
# need to have a socket open for logging.
def open(self):
self.result_feed = self.connect_result()
if not self.results_socket:
log.warn(" No results socket, will not broadcast sim data.")
else:
if self.results_socket:
sock = self.context.socket(zmq.PUSH)
sock.connect(self.results_socket)
self.results_socket = sock
self.sockets.append(sock)
self.out_socket = sock
self.setup_logging(sock)
self.perf.publish_to(sock)
self.initialize_algo()
def initialize_algo(self):
""" Setup loggers for algorithm and run algorithm's own
initialize method.
"""
self.logger = Logger("Print")
self.algo_log = Logger("AlgoLog")
self.algorithm.set_logger(self.algo_log)
self.run_logged_op(self.algorithm.initialize)
#Initialize log capture for testing purposes.
def setup_logging(self, socket = None):
sock = socket or self.results_socket
@@ -89,8 +89,8 @@ class TradeSimulationClient(Component):
socket = sock,
)
self.logger = Logger("Print")
# N.B. that this is a class, which is instantiated later
# This is a class, which is instantiated later
# in run_algorithm. The class provides a generator.
self.stdout_capture = stdout_only_pipe
@@ -175,37 +175,44 @@ class TradeSimulationClient(Component):
- Set the current portfolio for the algorithm as per protocol.
- Construct data based on backlog of events, send to algorithm.
"""
current_portfolio = self.perf.get_portfolio()
self.algorithm.set_portfolio(current_portfolio)
data = self.get_data()
if len(data) > 0:
data.portfolio = self.perf.get_portfolio()
# data injection pipeline for log rerouting
# any fields injected here should be added to
# LOG_EXTRA_FIELDS in zipline/protocol.py
if self.zmq_out:
self.run_logged_op(self.algorithm.handle_data, data)
def inject_event_data(record):
def run_logged_op(self, callable_op, *args, **kwargs):
""" Wrap a callable operation with the zmq logbook
handler if it exits."""
if self.zmq_out:
#Record the simulation time.
def inject_event_data(record):
# Record the simulation time.
record.extra['algo_dt'] = self.current_dt
record.extra['algo_dt'] = self.current_dt
data_injector = Processor(inject_event_data)
log_pipeline = NestedSetup([self.zmq_out,data_injector])
with log_pipeline.threadbound(), self.stdout_capture(self.logger, ''):
self.algorithm.handle_data(data)
data_injector = Processor(inject_event_data)
log_pipeline = NestedSetup([self.zmq_out,data_injector])
with log_pipeline.threadbound(), self.stdout_capture(self.logger, ''):
callable_op(*args, **kwargs)
# if no log socket, just run the algo normally
else:
self.algorithm.handle_data(data)
else:
callable_op(*args, **kwargs)
#Testing utility for log capture.
# TODO: remove test code from here.
def test_run_algorithm(self):
# since open is never called from some tests we need to
# set the logger explicitly
self.algorithm.set_logger(self.algo_log)
def inject_event_data(record):
record.extra['algo_dt'] = datetime.datetime.utcnow() #Mock an event.dt
# Mock an event.dt
record.extra['algo_dt'] = datetime.datetime.utcnow()
data_injector = Processor(inject_event_data)
log_pipeline = NestedSetup([self.zmq_out,
@@ -213,7 +220,6 @@ class TradeSimulationClient(Component):
data_injector])
with log_pipeline.threadbound(), self.stdout_capture(self.logger, ''):
self.algorithm.handle_data('data')
# if no log socket, just run the algo normally
def connect_order(self):
return self.connect_push_socket(self.addresses['order_address'])
+10 -2
View File
@@ -99,7 +99,6 @@ class Controller(object):
log.warn("Running Controller in development mode, will ONLY synchronize start.")
def init_zmq(self, flavor):
assert self.zmq_flavor in ['thread', 'mp', 'green']
if flavor == 'mp':
@@ -131,7 +130,6 @@ class Controller(object):
Give the controller a set set of components to manage and
a set of state transitions for the entire system.
"""
# A freeform topology is where we heartbeat with anything
# that shows up.
if topology == 'freeform':
@@ -289,6 +287,8 @@ class Controller(object):
# Hearbeat Cycle
# ==============
initializing = len(self.tracked) == 0 and len(self.finished) == 0
# Wait the responses
while self.alive:
@@ -318,6 +318,14 @@ class Controller(object):
log.info(repr(self.responses))
break
# if this is the first time heartbeating, break
# out early if we get everything tracked no need
# to hold out for the full heartbeat.
if initializing and not self.freeform:
if len(self.responses) == len(self.topology):
log.info("breaking out of initial heartbeat")
break
# ================
# Heartbeat Stats
# ================
+4
View File
@@ -198,6 +198,10 @@ class PerformanceTracker(object):
keep_transactions = True
)
def set_sids(self, sid_list):
for sid in sid_list:
self.cumulative_performance.positions[sid] = Position(sid)
def get_portfolio(self):
return self.cumulative_performance.as_portfolio()
+80 -3
View File
@@ -70,6 +70,9 @@ class TestAlgorithm():
def set_order(self, order_callable):
self.order = order_callable
def set_logger(self, logger):
pass
def set_portfolio(self, portfolio):
self.portfolio = portfolio
@@ -106,6 +109,9 @@ class HeavyBuyAlgorithm():
def set_order(self, order_callable):
self.order = order_callable
def set_logger(self, logger):
pass
def set_portfolio(self, portfolio):
self.portfolio = portfolio
@@ -129,6 +135,9 @@ class NoopAlgorithm(object):
def set_order(self, order_callable):
pass
def set_logger(self, logger):
pass
def set_portfolio(self, portfolio):
pass
@@ -138,17 +147,62 @@ class NoopAlgorithm(object):
def get_sid_filter(self):
return None
class ExceptionAlgorithm(object):
"""
Throw an exception from the method name specified in the
constructor.
"""
def __init__(self, throw_from):
self.throw_from == throw_from
def initialize(self):
if self.throw_from == "initialize":
raise Exception("Algo exception in initialize")
else:
pass
def set_order(self, order_callable):
if self.throw_from == "set_order":
raise Exception("Algo exception in set_order")
else:
pass
def set_logger(self, logger):
pass
def set_portfolio(self, portfolio):
if self.throw_from == "set_portfolio":
raise Exception("Algo exception in set_portfolio")
else:
pass
def handle_data(self, data):
if self.throw_from == "handle_data":
raise Exception("Algo exception in handle_data")
else:
pass
def get_sid_filter(self):
if self.throw_from == "get_sid_filter":
raise Exception("Algo exception in get_sid_filter")
else:
return [1]
class TestPrintAlgorithm():
def __init__(self):
pass
def initialize(self):
print "Initializing..."
def set_order(self, order_callable):
pass
def set_logger(self, logger):
pass
def set_portfolio(self, portfolio):
pass
@@ -157,4 +211,27 @@ class TestPrintAlgorithm():
pass
def get_sid_filter(self):
return None
return [1]
class TestLoggingAlgorithm():
def __init__(self):
self.log = None
def initialize(self):
self.log.info("Initializing...")
def set_order(self, order_callable):
pass
def set_logger(self, logger):
self.log = logger
def set_portfolio(self, portfolio):
pass
def handle_data(self, data):
self.log.info("Handling Data...")
def get_sid_filter(self):
return [1]