From a4ea3e3343245ff187c1904c6b0063f29d38abda Mon Sep 17 00:00:00 2001 From: fawce Date: Wed, 30 May 2012 23:55:25 -0400 Subject: [PATCH 01/15] marked a todo for a calculation bug --- zipline/finance/risk.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zipline/finance/risk.py b/zipline/finance/risk.py index f4bd736f..b4888f9e 100644 --- a/zipline/finance/risk.py +++ b/zipline/finance/risk.py @@ -187,6 +187,8 @@ class RiskMetrics(): return period_returns, returns def calculate_volatility(self, daily_returns): + # TODO: we should be using an annualized number for the + # square root, not the days in the period. return np.std(daily_returns, ddof=1) * math.sqrt(self.trading_days) def calculate_sharpe(self): From 036f9e93a7c87d18620cfaea27f492a64ca86b5f Mon Sep 17 00:00:00 2001 From: fawce Date: Thu, 31 May 2012 23:15:53 -0400 Subject: [PATCH 02/15] fixed a type-o in the documentation. --- zipline/finance/risk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zipline/finance/risk.py b/zipline/finance/risk.py index b4888f9e..a63cf872 100644 --- a/zipline/finance/risk.py +++ b/zipline/finance/risk.py @@ -27,7 +27,7 @@ Risk Report | alpha | The _algorithm_ alpha to the benchmark. | +-----------------+----------------------------------------------------+ | excess_return | The excess return of the algorithm over the | - | | benchmark. | + | | treasuries. | +-----------------+----------------------------------------------------+ | max_drawdown | The largest relative peak to relative trough move | | | for the portfolio returns between self.start_date | From 6d8a786214cb77a8858457ddeac3eeae88f310e9 Mon Sep 17 00:00:00 2001 From: fawce Date: Thu, 7 Jun 2012 13:53:45 -0400 Subject: [PATCH 03/15] updated performance to be trade by trade. --- zipline/finance/performance.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 5a2aeaef..f8380ae9 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -247,12 +247,12 @@ class PerformanceTracker(object): self.cumulative_performance.update_last_sale(event) self.todays_performance.update_last_sale(event) - - def handle_market_close(self): #calculate performance as of last trade self.cumulative_performance.calculate_performance() self.todays_performance.calculate_performance() + def handle_market_close(self): + # add the return results from today to the list of DailyReturn objects. todays_date = self.market_close.replace(hour=0, minute=0, second=0) todays_return_obj = risk.DailyReturn( @@ -503,7 +503,7 @@ class PerformancePeriod(object): 'capital_used' : self.period_capital_used, 'starting_value' : self.starting_value, 'starting_cash' : self.starting_cash, - 'ending_cash' : self.ending_cash, + 'cash' : self.ending_cash, 'portfolio_value' : self.ending_cash + self.ending_value, 'cumulative_capital_used' : self.cumulative_capital_used, 'max_capital_used' : self.max_capital_used, From 6a8654342add82ea6e6b69fe3c1d8ba8fae1d02f Mon Sep 17 00:00:00 2001 From: fawce Date: Thu, 7 Jun 2012 17:40:26 -0400 Subject: [PATCH 04/15] added an as_portfolio method to the PerformancePeriod. --- zipline/finance/performance.py | 90 +++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 34 deletions(-) diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index f8380ae9..9f32ae09 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -59,11 +59,17 @@ Position Tracking +-----------------+----------------------------------------------------+ | last_sale_price | price at last sale of the security on the exchange | +-----------------+----------------------------------------------------+ + | cost_basis | the volume weighted average price paid per share | + +-----------------+----------------------------------------------------+ + Performance Period ================== +Performance Periods are updated with every trade. When calling +code needs + +---------------+------------------------------------------------------+ | key | value | +===============+======================================================+ @@ -190,7 +196,7 @@ class PerformanceTracker(object): ) def get_portfolio(self): - return self.cumulative_performance.to_ndict() + return self.cumulative_performance.as_portfolio() def open(self, context): if self.results_addr: @@ -490,64 +496,80 @@ class PerformancePeriod(object): self.positions[event.sid].last_sale_price = event.price self.positions[event.sid].last_sale_date = event.dt - def to_dict(self): - """ - Creates a dictionary representing the state of this performance - period. See header comments for a detailed description. - """ - positions = self.get_positions_list() - transactions = [x.as_dict() for x in self.processed_transactions] - + def __core_dict(self): rval = { 'ending_value' : self.ending_value, 'capital_used' : self.period_capital_used, 'starting_value' : self.starting_value, 'starting_cash' : self.starting_cash, - 'cash' : self.ending_cash, + 'ending_cash' : self.ending_cash, 'portfolio_value' : self.ending_cash + self.ending_value, 'cumulative_capital_used' : self.cumulative_capital_used, 'max_capital_used' : self.max_capital_used, 'max_leverage' : self.max_leverage, - 'positions' : positions, 'pnl' : self.pnl, 'returns' : self.returns, - 'transactions' : transactions, 'period_open' : self.period_open, 'period_close' : self.period_close } + return rval + + + def to_dict(self): + """ + Creates a dictionary representing the state of this performance + period. See header comments for a detailed description. + """ + rval = self.__core_dict() + positions = self.get_positions_list() + rval['positions'] = positions + # we want the key to be absent, not just empty - if not self.keep_transactions: - del rval['transactions'] + if self.keep_transactions: + transactions = [x.as_dict() for x in self.processed_transactions] + rval['transactions'] = transactions return rval - def to_ndict(self): + def as_portfolio(self): """ - Creates a ndict representing the state of this perfomance period. - Properties are the same as the results of to_dict. See header comments - for a detailed description. - + The purpose of this method is to provide a portfolio + object to algorithms running inside the same trading + client. The data needed is captured raw in a + PerformancePeriod, and in this method we rename some + fields for usability and remove extraneous fields. """ - positions = self.get_positions(ndicted=True) + portfolio = self.__core_dict() + # rename: + # ending_cash -> cash + # period_open -> backtest_start + # + # remove: + # period_close, starting_value, + # cumulative_capital_used, max_leverage, max_capital_used + portfolio['cash'] = portfolio['ending_cash'] + portfolio['start_date'] = portfolio['period_open'] + portfolio['position_value'] = portfolio['ending_value'] - positions = zp.ndict(positions) + del(portfolio['ending_cash']) + del(portfolio['period_open']) + del(portfolio['period_close']) + del(portfolio['starting_value']) + del(portfolio['ending_value']) + del(portfolio['cumulative_capital_used']) + del(portfolio['max_leverage']) + del(portfolio['max_capital_used']) - return zp.ndict({ - 'ending_value' : self.ending_value, - 'capital_used' : self.period_capital_used, - 'starting_value' : self.starting_value, - 'starting_cash' : self.starting_cash, - 'ending_cash' : self.ending_cash, - 'cumulative_capital_used' : self.cumulative_capital_used, - 'max_capital_used' : self.max_capital_used, - 'max_leverage' : self.max_leverage, - 'positions' : positions, - 'transactions' : self.processed_transactions - }) + portfolio['positions'] = self.get_positions(ndicted=True) + return zp.ndict(portfolio) def get_positions(self, ndicted=False): - positions = {} + if ndicted: + positions = zp.ndict({}) + else: + positions = {} + for sid, pos in self.positions.iteritems(): cur = pos.to_dict() if ndicted: From 0dea9c5ecb2154326ed7e7839d20b34c7b3aa6d6 Mon Sep 17 00:00:00 2001 From: fawce Date: Thu, 7 Jun 2012 17:58:30 -0400 Subject: [PATCH 05/15] finished comment --- zipline/finance/performance.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 9f32ae09..e237ec7b 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -68,7 +68,10 @@ Performance Period ================== Performance Periods are updated with every trade. When calling -code needs +code needs a portfolio object that fulfills the algorithm +protocol, use the PerformancePeriod.as_portfolio method. See that +method for comments on the specific fields provided (and +omitted). +---------------+------------------------------------------------------+ | key | value | From fefca77fe2ddced7deb7a04c015e8405e2cc2dbc Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Mon, 11 Jun 2012 09:26:48 -0400 Subject: [PATCH 06/15] FSM for Feed --- zipline/components/feed.py | 26 ++++++++++++++++++++++++-- zipline/components/passthrough.py | 2 +- zipline/core/component.py | 20 ++++++++++++++++++++ zipline/finance/vwap.py | 10 +++++----- zipline/transforms/base.py | 6 +++--- zipline/transitions.py | 19 ------------------- 6 files changed, 53 insertions(+), 30 deletions(-) diff --git a/zipline/components/feed.py b/zipline/components/feed.py index 1099f257..dd27336c 100644 --- a/zipline/components/feed.py +++ b/zipline/components/feed.py @@ -3,13 +3,27 @@ from collections import Counter from zipline.core.component import Component from zipline.components.aggregator import Aggregate +from zipline.utils.protocol_utils import Enum import zipline.protocol as zp +from zipline.transitions import WorkflowMeta from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \ CONTROL_FRAME, CONTROL_UNFRAME LOGGER = logging.getLogger('ZiplineLogger') +# FSM +# --- + +INIT, READY, DRAINING = FEED_STATES = \ +Enum( 'INIT', 'READY', 'DRAINING') + +state_transitions = dict( + do_start = (-1 , INIT) , + do_run = (INIT , READY) , + do_drain = (READY , DRAINING) , +) + class Feed(Aggregate): """ Connects to N PULL sockets, publishing all messages received to a @@ -18,20 +32,26 @@ class Feed(Aggregate): one execution context (thread, process, etc) and run in another. """ + __metaclass__ = WorkflowMeta + + states = list(FEED_STATES) + transitions = state_transitions + initial_state = -1 + def init(self): self.sent_count = 0 self.received_count = 0 self.draining = False self.ds_finished_counter = 0 - # Depending on the size of this, might want to use a data - # structure with better asymptotics. self.data_buffer = {} # source_id -> integer count self.sent_counters = Counter() self.recv_counters = Counter() + self.state = INIT + @property def get_id(self): return "FEED" @@ -71,6 +91,8 @@ class Feed(Aggregate): """ Get the next message in chronological order. """ + + # is_full and draining defined in aggregator if not(self.is_full() or self.draining): return diff --git a/zipline/components/passthrough.py b/zipline/components/passthrough.py index 2b176774..e3f2f863 100644 --- a/zipline/components/passthrough.py +++ b/zipline/components/passthrough.py @@ -7,7 +7,7 @@ class PassthroughTransform(BaseTransform): """ def init(self): - self.state = { 'name': 'PASSTHROUGH' } + self.props = { 'name': 'PASSTHROUGH' } #TODO, could save some cycles by skipping the _UNFRAME call # and just setting value to original msg string. diff --git a/zipline/core/component.py b/zipline/core/component.py index d5b14d96..fbf3951e 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -568,3 +568,23 @@ class Component(object): pid = os.getpid() , pointer = hex(id(self)) , ) + + + @property + def state(self): + if not hasattr(self, '_state'): + self._state = self.initial_state + else: + return self._state + + @state.setter + def state(self, new): + if not hasattr(self, '_state'): + self._state = self.initial_state + + old = self._state + + if (old, new) in self.workflow: + self._state = new + else: + raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new)) diff --git a/zipline/finance/vwap.py b/zipline/finance/vwap.py index 35eb9578..09941fe1 100644 --- a/zipline/finance/vwap.py +++ b/zipline/finance/vwap.py @@ -7,20 +7,20 @@ from zipline.finance.movingaverage import EventWindow class VWAPTransform(BaseTransform): def init(self, name, daycount=3): - self.state = {} - self.state['name'] = name + self.props = {} + self.props['name'] = name self.daycount = daycount self.by_sid = defaultdict(self.create_vwap) @property def get_id(self): - return self.state['name'] + return self.props['name'] def transform(self, event): cur = self.by_sid[event.sid] cur.update(event) - self.state['value'] = cur.vwap - return self.state + self.props['value'] = cur.vwap + return self.props def create_vwap(self): return DailyVWAP(self.daycount) diff --git a/zipline/transforms/base.py b/zipline/transforms/base.py index 082e52b9..2c8ab856 100644 --- a/zipline/transforms/base.py +++ b/zipline/transforms/base.py @@ -25,7 +25,7 @@ class BaseTransform(Component): @property def get_id(self): - return self.state['name'] + return self.props['name'] @property def get_type(self): @@ -116,9 +116,9 @@ class BaseTransform(Component): Transforms run in parallel and results are merged into a single map, so transform names must be unique. Best practice - is to use the self.state object initialized from the transform + is to use the self.props object initialized from the transform configuration, and only set the transformed value:: - self.state['value'] = transformed_value + self.props['value'] = transformed_value """ raise NotImplementedError diff --git a/zipline/transitions.py b/zipline/transitions.py index 495a32d4..e47d54c8 100644 --- a/zipline/transitions.py +++ b/zipline/transitions.py @@ -39,24 +39,6 @@ class WorkflowMeta(type): """ Base metaclass component workflows. """ - @property - def state(self): - if not hasattr(self, '_state'): - self._state = self.initial_state - else: - return self._state - - @state.setter - def state(self, new): - if not hasattr(self, '_state'): - self._state = self.initial_state - - old = self._state - - if (old, new) in self.workflow: - self._state = new - else: - raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new)) def __new__(cls, name, mro, attrs): base = 'Component' @@ -71,7 +53,6 @@ class WorkflowMeta(type): raise RuntimeError('`workflow` is a reserved attribute.') if not state: - import pdb; pdb.set_trace() raise RuntimeError('Must specify states') if not transitions: From 270e0674269dacbc0d97f05e9f594c2d9aaebc7f Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Mon, 11 Jun 2012 13:10:05 -0400 Subject: [PATCH 07/15] FSM actions for Feed and Merge. --- zipline/components/aggregator.py | 35 +++++++++++++++++++------- zipline/components/feed.py | 40 +++++++++++------------------ zipline/components/merge.py | 9 +++++-- zipline/core/component.py | 43 +++++++++++++++++--------------- 4 files changed, 70 insertions(+), 57 deletions(-) diff --git a/zipline/components/aggregator.py b/zipline/components/aggregator.py index 5aadb7e8..a7bd6c82 100644 --- a/zipline/components/aggregator.py +++ b/zipline/components/aggregator.py @@ -9,16 +9,30 @@ Abstract base class for Feed and Merge. Feed Merge """ -import logging -from collections import Counter +import logbook import zipline.protocol as zp from zipline.core.component import Component from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \ CONTROL_FRAME, CONTROL_UNFRAME +from zipline.utils.protocol_utils import Enum +from zipline.transitions import WorkflowMeta -LOGGER = logging.getLogger('ZiplineLogger') +log = logbook.Logger('Aggregate') + +# ================= +# State Transitions +# ================= + +INIT, READY, DRAINING = AGGREGATE_STATES = \ +Enum( 'INIT', 'READY', 'DRAINING') + +AGGREGATE_TRANSITIONS = dict( + do_start = (-1 , INIT) , + do_run = (INIT , READY) , + do_drain = (READY , DRAINING) , +) class Aggregate(Component): """ @@ -32,6 +46,9 @@ class Aggregate(Component): Feed and Merge define these differently. """ + abstract = True + __metaclass__ = WorkflowMeta + @property def get_type(self): return COMPONENT_TYPE.CONDUIT @@ -76,22 +93,21 @@ class Aggregate(Component): if len(self.data_buffer) == self.ds_finished_counter: #drain any remaining messages in the buffer - LOGGER.debug("draining feed") + log.debug("Draining Feed") self.drain() self.signal_done() else: try: event = self.unframe(message) - # deserialization error except zp.INVALID_DATASOURCE_FRAME as exc: + # Error deserializing return self.signal_exception(exc) try: self.append(event) self.send_next() - - # Invalid message except zp.INVALID_DATASOURCE_FRAME as exc: + # Invalid message return self.signal_exception(exc) # ------------- @@ -102,7 +118,7 @@ class Aggregate(Component): """ Send all messages in the buffer. """ - self.draining = True + self.state = DRAINING while self.pending_messages() > 0: self.send_next() @@ -114,7 +130,8 @@ class Aggregate(Component): return event = self.next() - if(event != None): + + if event: self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK) self.sent_counters[event.source_id] += 1 self.sent_count += 1 diff --git a/zipline/components/feed.py b/zipline/components/feed.py index dd27336c..2a7cff60 100644 --- a/zipline/components/feed.py +++ b/zipline/components/feed.py @@ -1,28 +1,15 @@ -import logging +import logbook from collections import Counter -from zipline.core.component import Component -from zipline.components.aggregator import Aggregate -from zipline.utils.protocol_utils import Enum +from zipline.components.aggregator import Aggregate, \ + AGGREGATE_STATES, AGGREGATE_TRANSITIONS import zipline.protocol as zp -from zipline.transitions import WorkflowMeta -from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \ - CONTROL_FRAME, CONTROL_UNFRAME +log = logbook.Logger('Feed') -LOGGER = logging.getLogger('ZiplineLogger') - -# FSM -# --- - -INIT, READY, DRAINING = FEED_STATES = \ -Enum( 'INIT', 'READY', 'DRAINING') - -state_transitions = dict( - do_start = (-1 , INIT) , - do_run = (INIT , READY) , - do_drain = (READY , DRAINING) , -) +# ========= +# Component +# ========= class Feed(Aggregate): """ @@ -32,16 +19,13 @@ class Feed(Aggregate): one execution context (thread, process, etc) and run in another. """ - __metaclass__ = WorkflowMeta - - states = list(FEED_STATES) - transitions = state_transitions + states = list(AGGREGATE_STATES) + transitions = AGGREGATE_TRANSITIONS initial_state = -1 def init(self): self.sent_count = 0 self.received_count = 0 - self.draining = False self.ds_finished_counter = 0 self.data_buffer = {} @@ -50,12 +34,16 @@ class Feed(Aggregate): self.sent_counters = Counter() self.recv_counters = Counter() - self.state = INIT + self.state = AGGREGATE_STATES.INIT @property def get_id(self): return "FEED" + @property + def draining(self): + return self.state == DRAINING + # ------- # Sockets # ------- diff --git a/zipline/components/merge.py b/zipline/components/merge.py index 8a2ae7c1..bbb9dc6a 100644 --- a/zipline/components/merge.py +++ b/zipline/components/merge.py @@ -1,6 +1,6 @@ - import zipline.protocol as zp -from zipline.components.aggregator import Aggregate +from zipline.components.aggregator import Aggregate, \ + AGGREGATE_STATES, AGGREGATE_TRANSITIONS from collections import Counter @@ -8,6 +8,11 @@ class Merge(Aggregate): """ Merges multiple streams of events into single messages. """ + + states = list(AGGREGATE_STATES) + transitions = AGGREGATE_TRANSITIONS + initial_state = -1 + def init(self): self.sent_count = 0 self.received_count = 0 diff --git a/zipline/core/component.py b/zipline/core/component.py index fbf3951e..ba3f8e42 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -488,6 +488,29 @@ class Component(object): self.sockets.append(self.sync_socket) + # ----------- + # FSM Actions + # ----------- + + #@property + #def state(self): + #if not hasattr(self, '_state'): + #self._state = self.initial_state + #else: + #return self._state + + #@state.setter + #def state(self, new): + #if not hasattr(self, '_state'): + #self._state = self.initial_state + + #old = self._state + + #if (old, new) in self.workflow: + #self._state = new + #else: + #raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new)) + # --------------------- # Description and Debug # --------------------- @@ -568,23 +591,3 @@ class Component(object): pid = os.getpid() , pointer = hex(id(self)) , ) - - - @property - def state(self): - if not hasattr(self, '_state'): - self._state = self.initial_state - else: - return self._state - - @state.setter - def state(self, new): - if not hasattr(self, '_state'): - self._state = self.initial_state - - old = self._state - - if (old, new) in self.workflow: - self._state = new - else: - raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new)) From aac8e92b0656246649f8957f1dd37ec3fd5f48e1 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Mon, 11 Jun 2012 13:40:54 -0400 Subject: [PATCH 08/15] Logbook for component state tracking. --- zipline/components/feed.py | 2 +- zipline/components/tradesimulation.py | 6 ++-- zipline/core/component.py | 6 ++-- zipline/core/host.py | 21 +++++------ zipline/core/monitor.py | 51 ++++++++++++--------------- zipline/lines.py | 5 --- zipline/utils/logger.py | 1 + 7 files changed, 42 insertions(+), 50 deletions(-) diff --git a/zipline/components/feed.py b/zipline/components/feed.py index 2a7cff60..f51e515d 100644 --- a/zipline/components/feed.py +++ b/zipline/components/feed.py @@ -42,7 +42,7 @@ class Feed(Aggregate): @property def draining(self): - return self.state == DRAINING + return self.state == AGGREGATE_STATES.DRAINING # ------- # Sockets diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index 79312002..f130be0b 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -1,4 +1,4 @@ -import logging +import logbook import datetime import zipline.protocol as zp @@ -8,7 +8,7 @@ from zipline.core.component import Component from zipline.finance.trading import TransactionSimulator from zipline.utils.protocol_utils import ndict -LOGGER = logging.getLogger('ZiplineLogger') +log = logbook.Logger('TradeSimulation') class TradeSimulationClient(Component): @@ -74,7 +74,7 @@ class TradeSimulationClient(Component): self.finish_simulation() def finish_simulation(self): - LOGGER.info("Client is DONE!") + log.info("TradeSimulation is Done") # signal the performance tracker that the simulation has # ended. Perf will internally calculate the full risk report. self.perf.handle_simulation_end() diff --git a/zipline/core/component.py b/zipline/core/component.py index ba3f8e42..df3de619 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -7,7 +7,7 @@ import sys import uuid import time import socket -import logging +import logbook import traceback import humanhash from setproctitle import setproctitle @@ -23,7 +23,7 @@ from zipline.utils.gpoll import _Poller as GeventPoller from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \ COMPONENT_FAILURE, CONTROL_FRAME -LOGGER = logging.getLogger('ZiplineLogger') +log = logbook.Logger('Component') from zipline.exceptions import ComponentNoInit from zipline.transitions import WorkflowMeta @@ -357,7 +357,7 @@ class Component(object): #notify internal work look that we're done self.done = True # TODO: use state flag - LOGGER.info("[%s] DONE" % self.get_id) + log.info("[%s] DONE" % self.get_id) # ----------- # Messaging diff --git a/zipline/core/host.py b/zipline/core/host.py index 16126465..3320bfbc 100644 --- a/zipline/core/host.py +++ b/zipline/core/host.py @@ -1,4 +1,4 @@ -import logging +import logbook import datetime from component import Component @@ -8,7 +8,7 @@ from zipline.components import Feed, Merge, PassthroughTransform, \ DataSource from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE -LOGGER = logging.getLogger('ZiplineLogger') +log = logbook.Logger('Host') class ComponentHost(Component): """ @@ -89,7 +89,7 @@ class ComponentHost(Component): """ Setup the sync socket and poller. ( Bind ) """ - #LOGGER.debug("Connecting sync server.") + #log.debug("Connecting sync server.") self.sync_socket = self.context.socket(self.zmq.REP) self.sync_socket.bind(self.addresses['sync_address']) @@ -100,11 +100,11 @@ class ComponentHost(Component): self.sockets.append(self.sync_socket) def open(self): - LOGGER.info('== Roll Call ==\n') + log.info('== Roll Call ==\n') for component in self.components.itervalues(): - LOGGER.info(component) + log.info(component) - LOGGER.info('== End Roll Call ==\n') + log.info('== End Roll Call ==\n') for component in self.components.itervalues(): self.launch_component(component) @@ -117,7 +117,7 @@ class ComponentHost(Component): """ if len(self.components) == 0: - LOGGER.info("Component register is empty.") + log.info("Component register is empty.") return False return True @@ -138,14 +138,15 @@ class ComponentHost(Component): except ValueError as exc: self.signal_exception(exc) - if status == str(CONTROL_PROTOCOL.DONE): # TODO: other way around - LOGGER.debug("{id} is DONE".format(id=sync_id)) + # TODO: other way around + if status == str(CONTROL_PROTOCOL.DONE): + #log.debug("Component claims done: {id}".format(id=sync_id)) self.unregister_component(sync_id) self.state_flag = COMPONENT_STATE.DONE else: self.sync_register[sync_id] = datetime.datetime.utcnow() - #qutil.LOGGER.info("confirmed {id}".format(id=msg)) + #log.info("confirmed {id}".format(id=msg)) # send synchronization reply self.sync_socket.send('ack', self.zmq.NOBLOCK) diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index 5b2384f4..eafa07bb 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -2,7 +2,7 @@ import zmq import time import gevent import itertools -import logging +import logbook import gevent_zeromq from collections import OrderedDict @@ -122,6 +122,9 @@ class UnknownChatter(Exception): return """Component calling itself "%s" talking on unexpected channel"""\ % self.named + +log = logbook.Logger('Controller') + class Controller(object): """ A N to M messaging system for inter component communication. @@ -133,9 +136,6 @@ class Controller(object): the individual components. :func message_sender: . - :param logging: Logging interface for tracking broker state - Defaults to None - Topology is the set of components we expect to show up. States are the transitions the sytems go through. The simplest is from RUNNING -> NOT RUNNING . @@ -159,7 +159,7 @@ class Controller(object): debug = False period = 1 - def __init__(self, pub_socket, route_socket, logger = None): + def __init__(self, pub_socket, route_socket): self.context = None self.zmq = None @@ -182,12 +182,6 @@ class Controller(object): self.error_replay = OrderedDict() - if logger: - self.logging = logger - else: - default_logger = logging.getLogger('ZiplineLogger') - self.logging = default_logger - def init_zmq(self, flavor): assert self.zmq_flavor in ['thread', 'mp', 'green'] @@ -240,9 +234,9 @@ class Controller(object): old, self._state = self._state, new if (old, new) not in state_transitions: - raise RuntimeError("[Controller] Invalid State Transition : %s -> %s" %(old, new)) + raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new)) else: - self.logging.info("[Controller] State Transition : %s -> %s" %(old, new)) + log.error("State Transition : %s -> %s" %(old, new)) def run(self): self.running = True @@ -251,13 +245,13 @@ class Controller(object): try: return self._poll() # use a python loop except KeyboardInterrupt: - self.logging.info('Shutdown event loop') + log.debug('Shutdown event loop') def log_status(self): """ Snapshot of the tracked components at every period. """ - #self.logging.info("[Controller] Tracking : %s" % ([c for c in self.tracked],)) + #log.info("Tracking component : %s" % ([c for c in self.tracked],)) pass def replay_errors(self): @@ -366,11 +360,11 @@ class Controller(object): self.handle_recv(buffer[:]) buffer = [] except INVALID_CONTROL_FRAME: - self.logging.error('Invalid frame', rawmessage) + log.error('Invalid frame', rawmessage) pass if socks.get(self.cancel) == self.zmq.POLLIN: - self.logging.info('[Controller] Received Cancellation') + log.info('Received Cancellation') rawmessage = self.cancel.recv() self.cancel.send('') self.shutdown(soft=True) @@ -430,7 +424,7 @@ class Controller(object): if self.state is CONTROL_STATES.TERMINATE: return - self.logging.info('[Controller] Now Tracking "%s" ' % component) + log.info(' Now Tracking "%s" ' % component) universal = self.new_universal init_handlers = { @@ -452,7 +446,7 @@ class Controller(object): def fail_universal(self): pass # TODO: this requires higher order functionality - #self.logging.error('[Controller] System in exception state, shutting down') + #log.error('System in exception state, shutting down') #self.shutdown(soft=True) def fail(self, component): @@ -463,7 +457,7 @@ class Controller(object): fail_handlers = { } if component in self.topology or self.freeform: - self.logging.info('[Controller] Component "%s" timed out' % component) + log.error('Component "%s" timed out' % component) self.tracked.remove(component) fail_handlers.get(component, universal)() @@ -472,7 +466,7 @@ class Controller(object): # ------------------- def done(self, component): - self.logging.info('[Controller] Component "%s" done.' % component) + log.info('Component "%s" signaled done.' % component) # -------------- # Error Handling @@ -482,7 +476,7 @@ class Controller(object): """ Shutdown the system on failure. """ - self.logging.error('[Controller] System in exception state, shutting down') + log.error('System in exception state, shutting down') self.shutdown(soft=True) def exception(self, component, failure): @@ -491,7 +485,7 @@ class Controller(object): if component in self.topology or self.freeform: self.error_replay[(component, time.time())] = failure - self.logging.error('[Controller] Component "%s" in exception state' % component) + log.error('Component in exception state: %s' % component) exception_handlers.get(component, universal)() else: @@ -517,8 +511,9 @@ class Controller(object): self.responses.add(identity) else: # Otherwise its something weird and we don't know - # what to do so just say so - self.logging.error("Weird stuff happened: %s" % msg) + # what to do so just say so, probably line noise + # from ZeroMQ + log.error("Weird stuff happened: %s" % msg) # A component is telling us it failed, and how if id is CONTROL_PROTOCOL.EXCEPTION: @@ -572,7 +567,7 @@ class Controller(object): def do_error_replay(self): for (component, time), error in self.error_replay.iteritems(): - self.logging.info('[Controller] Error Log for -- %s --:\n%s' % + log.info('Error Log for -- %s --:\n%s' % (component, error)) def shutdown(self, hard=False, soft=True, context=None): @@ -587,7 +582,7 @@ class Controller(object): if hard: self.state = CONTROL_STATES.TERMINATE - self.logging.info('[Controller] Hard Shutdown') + log.info('Hard Shutdown') #for asoc in self.associated: #asoc.close() @@ -595,7 +590,7 @@ class Controller(object): if soft: self.state = CONTROL_STATES.TERMINATE - self.logging.info('[Controller] Soft Shutdown') + log.info('Soft Shutdown') self.send_softkill() #for asoc in self.associated: diff --git a/zipline/lines.py b/zipline/lines.py index 070931c5..a229c9f0 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -60,8 +60,6 @@ before invoking simulate. +---------------------------------+ """ -import logging - import zipline.utils.factory as factory from zipline.components import DataSource @@ -73,8 +71,6 @@ from zipline.core.devsimulator import Simulator from zipline.core.monitor import Controller from zipline.finance.trading import SIMULATION_STYLE -LOGGER = logging.getLogger('ZiplineLogger') - class SimulatedTrading(object): """ Zipline with:: @@ -133,7 +129,6 @@ class SimulatedTrading(object): self.con = Controller( sockets[6], sockets[7], - logger = LOGGER ) self.con.cancel_socket = self.allocator.lease(1)[0] diff --git a/zipline/utils/logger.py b/zipline/utils/logger.py index c3b97cda..b044642f 100644 --- a/zipline/utils/logger.py +++ b/zipline/utils/logger.py @@ -3,6 +3,7 @@ Small classes to assist with timezone calculations, LOGGER configuration, and other common operations. """ +# DEPRECATED DO NOT USE import logging import logging.config From de9b15144ac51de48ee8ce60dc6913f1d2417456 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Mon, 11 Jun 2012 13:52:02 -0400 Subject: [PATCH 09/15] Update Thomas's tests to use logbook. --- tests/test_optimize.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/test_optimize.py b/tests/test_optimize.py index 35c1943b..cefafd8d 100644 --- a/tests/test_optimize.py +++ b/tests/test_optimize.py @@ -2,12 +2,9 @@ from unittest2 import TestCase, skip from nose.tools import timed from collections import defaultdict -import logging import numpy as np -from zipline.utils.logger import configure_logging - from zipline.core.devsimulator import AddressAllocator from zipline.optimize.factory import create_predictable_zipline @@ -15,7 +12,8 @@ DEFAULT_TIMEOUT = 15 # seconds EXTENDED_TIMEOUT = 90 allocator = AddressAllocator(1000) -LOGGER = logging.getLogger('ZiplineLogger') + +from logbook.compat import LoggingHandler class TestUpDown(TestCase): """This unittest verifies that the BuySellAlgorithm in @@ -26,14 +24,18 @@ class TestUpDown(TestCase): leased_sockets = defaultdict(list) def setUp(self): - configure_logging() self.zipline_test_config = { - 'allocator':allocator, - 'sid':133, - 'trade_count':5, - 'amplitude':30, - 'base_price':50 + 'allocator' : allocator, + 'sid' : 133, + 'trade_count' : 5, + 'amplitude' : 30, + 'base_price' : 50 } + self.log_handler = LoggingHandler() + self.log_handler.push_application() + + def tearDown(self): + self.log_handler.pop_application() @timed(DEFAULT_TIMEOUT) def test_source_and_orders(self): From 2d85ec4d953784e5b74c6146f5d88f42440804e1 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Mon, 11 Jun 2012 13:54:00 -0400 Subject: [PATCH 10/15] Update performance tracker ot use Logbook --- zipline/finance/performance.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index e237ec7b..d0dbb643 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -120,7 +120,7 @@ omitted). """ -import logging +import logbook import datetime import pytz import math @@ -130,7 +130,7 @@ import zmq import zipline.protocol as zp import zipline.finance.risk as risk -LOGGER = logging.getLogger('ZiplineLogger') +log = logbook.Logger('Performance') class PerformanceTracker(object): """ @@ -207,7 +207,7 @@ class PerformanceTracker(object): sock.connect(self.results_addr) self.results_socket = sock else: - LOGGER.warn("Not streaming results because no results socket given") + log.warn("Not streaming results because no results socket given") def publish_to(self, results_addr): """ @@ -293,8 +293,8 @@ class PerformanceTracker(object): returns = self.todays_performance.returns max_dd = -1 * self.trading_environment.max_drawdown if returns < max_dd: - LOGGER.info(str(returns) + " broke through " + str(max_dd)) - LOGGER.info("Exceeded max drawdown.") + log.info(str(returns) + " broke through " + str(max_dd)) + log.info("Exceeded max drawdown.") # mark the perf period with max loss flag, # so it shows up in the update, but don't end the test # here. Let the update go out before stopping @@ -329,8 +329,8 @@ class PerformanceTracker(object): """ log_msg = "Simulated {n} trading days out of {m}." - LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days)) - LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open)) + log.info(log_msg.format(n=self.day_count, m=self.total_days)) + log.info("first open: {d}".format(d=self.trading_environment.first_open)) # the stream will end on the last trading day, but will not trigger # an end of day, so we trigger the final market close here. @@ -345,7 +345,7 @@ class PerformanceTracker(object): ) if self.results_socket: - LOGGER.info("about to stream the risk report...") + log.info("about to stream the risk report...") risk_dict = self.risk_report.to_dict() msg = zp.RISK_FRAME(risk_dict) From 06e6207020d936f0049009ea3e985bf36be04bda Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Tue, 12 Jun 2012 07:11:52 -0400 Subject: [PATCH 11/15] Update logging system --- zipline/core/component.py | 2 +- zipline/core/host.py | 4 +- zipline/core/monitor.py | 102 ++------------------------------- zipline/finance/performance.py | 3 + 4 files changed, 12 insertions(+), 99 deletions(-) diff --git a/zipline/core/component.py b/zipline/core/component.py index df3de619..2d0619ce 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -357,7 +357,7 @@ class Component(object): #notify internal work look that we're done self.done = True # TODO: use state flag - log.info("[%s] DONE" % self.get_id) + #log.info("[%s] DONE" % self.get_id) # ----------- # Messaging diff --git a/zipline/core/host.py b/zipline/core/host.py index 3320bfbc..96ab5c05 100644 --- a/zipline/core/host.py +++ b/zipline/core/host.py @@ -100,11 +100,11 @@ class ComponentHost(Component): self.sockets.append(self.sync_socket) def open(self): - log.info('== Roll Call ==\n') + log.info('== Roll Call ==') for component in self.components.itervalues(): log.info(component) - log.info('== End Roll Call ==\n') + log.info('== End Roll Call ==') for component in self.components.itervalues(): self.launch_component(component) diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index eafa07bb..c274df20 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -7,106 +7,13 @@ import gevent_zeromq from collections import OrderedDict +from zipline.utils.gpoll import _Poller as GeventPoller from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \ CONTROL_UNFRAME, CONTROL_STATES, INVALID_CONTROL_FRAME \ -states = CONTROL_STATES - -from zipline.utils.gpoll import _Poller as GeventPoller - -# Roll Call ( Discovery ) -# ----------------------- -# -# Controller ( 'foo', 'bar', 'fizz', 'pop' ) -# ------------------ -# | | | | -# +---+ -# | 0 | ? ? ? -# +---+ -# | -# IDENTITY: foo -# get message: PROTOCOL.HEARTBEAT -# reply with PROTOCOL.OK -# -# Controller topology = ( 'foo', 'bar', 'fizz', 'pop' ) -# 'foo' in topology = YES -> -# track 'foo' -# ------------------ -# | | | | -# +---+ -# | 1 | ? ? ? -# +---+ - -# Heartbeating -# ------------ -# -# Controller ( time = 2.717828 ) -# ------------------ -# | | | | -# +---+ +---+ +---+ +---+ -# | 0 | | 0 | | 0 | | 0 | -# +---+ +---+ +---+ +---+ -# | -# IDENTITY: foo -# get message: time = 2.717828 -# reply with [ foo, 2.71828 ] -# -# Controller ( foo.status = OK ) -# ------------------ -# | | | | -# +---+ +---+ +---+ +---+ -# | 1 | | 0 | | 0 | | 0 | -# +---+ +---+ +---+ +---+ -# | -# Controller tracks this node as good -# for this heartbeat - -# Shutdown -# -------- -# -# Controller ( state = RUNNING ) -# ------------------ -# | | | | -# +---+ +---+ +---+ +---+ -# | 1 | | 1 | | 1 | | 1 | -# +---+ +---+ +---+ +---+ -# | -# IDENTITY: foo -# send [ DONE ] - -# Controller ( state = SHUTDOWN ) -# Controller topology.remove('foo') -# ------------------ -# | | | -# +---+ +---+ +---+ +---+ -# | | | 1 | | 1 | | 1 | -# +---+ +---+ +---+ +---+ -# | -# IDENTITY: foo -# yield, stop sending messages - -# Termination -# ------------ -# -# Controller ( state = TERMINATE ) -# ------------------ -# | | | | -# +---+ +---+ +---+ +---+ -# | 1 | | 1 | | 1 | | 1 | -# +---+ +---+ +---+ +---+ -# | -# get message PROTOCOL.KILL - -# Controller ( state = TERMINATE ) -# ------------------ -# | | | | -# +---+ +---+ +---+ +---+ -# | 0 | | 0 | | 0 | | 0 | -# +---+ +---+ +---+ +---+ - INIT, SOURCES_READY, RUNNING, TERMINATE = CONTROL_STATES -state_transitions = frozenset([ +CONTROLLER_TRANSITIONS = frozenset([ (-1 , INIT), (INIT , SOURCES_READY), (SOURCES_READY , RUNNING), @@ -233,7 +140,7 @@ class Controller(object): def state(self, new): old, self._state = self._state, new - if (old, new) not in state_transitions: + if (old, new) not in CONTROLLER_TRANSITIONS: raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new)) else: log.error("State Transition : %s -> %s" %(old, new)) @@ -572,6 +479,9 @@ class Controller(object): def shutdown(self, hard=False, soft=True, context=None): + if self.state is CONTROL_STATES.TERMINATE: + return + if not self.polling: return diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index d0dbb643..c4b08eee 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -287,12 +287,15 @@ class PerformanceTracker(object): if self.results_socket: msg = zp.PERF_FRAME(self.to_dict()) self.results_socket.send(msg) + else: + log.info(self.to_dict()) # if self.trading_environment.max_drawdown: returns = self.todays_performance.returns max_dd = -1 * self.trading_environment.max_drawdown if returns < max_dd: + print 0/0 log.info(str(returns) + " broke through " + str(max_dd)) log.info("Exceeded max drawdown.") # mark the perf period with max loss flag, From 20f7affc76c6300a7e53357aa6dc4825bf79d7bd Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Tue, 12 Jun 2012 07:38:18 -0400 Subject: [PATCH 12/15] Added logbook to reqs. --- etc/requirements.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/etc/requirements.txt b/etc/requirements.txt index 236073f7..9ac342c4 100644 --- a/etc/requirements.txt +++ b/etc/requirements.txt @@ -13,3 +13,6 @@ setuptools==0.6c11 # Unix setproctitle==1.1.6 + +# Logging +Logbook=0.3 From 1b2ac74decaf6685f75f29c24d03382a7a1f20b0 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Tue, 12 Jun 2012 07:42:35 -0400 Subject: [PATCH 13/15] Updated test config. --- etc/requirements.txt | 2 +- jenkins_setup.cfg | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/etc/requirements.txt b/etc/requirements.txt index 9ac342c4..3163bc7d 100644 --- a/etc/requirements.txt +++ b/etc/requirements.txt @@ -15,4 +15,4 @@ setuptools==0.6c11 setproctitle==1.1.6 # Logging -Logbook=0.3 +Logbook==0.3 diff --git a/jenkins_setup.cfg b/jenkins_setup.cfg index a6f24289..3798b1be 100644 --- a/jenkins_setup.cfg +++ b/jenkins_setup.cfg @@ -2,7 +2,7 @@ verbosity=2 detailed-errors=1 -with-xcoverage=1 +#with-xcoverage=1 cover-package=zipline cover-erase=1 cover-html=1 From 61ee2420ebeadee75201075e9a669c9b1d07f1b8 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Tue, 12 Jun 2012 13:31:48 -0400 Subject: [PATCH 14/15] Removed dummy print statement. --- zipline/finance/performance.py | 1 - 1 file changed, 1 deletion(-) diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index c4b08eee..03320e09 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -295,7 +295,6 @@ class PerformanceTracker(object): returns = self.todays_performance.returns max_dd = -1 * self.trading_environment.max_drawdown if returns < max_dd: - print 0/0 log.info(str(returns) + " broke through " + str(max_dd)) log.info("Exceeded max drawdown.") # mark the perf period with max loss flag, From 9c9cddb2e8b5bad71d0c0d876dc0f3e7bce0973a Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Wed, 13 Jun 2012 16:21:43 -0400 Subject: [PATCH 15/15] Tidy up logging statements. --- zipline/core/monitor.py | 4 ++-- zipline/finance/performance.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index c274df20..bfb13037 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -143,7 +143,7 @@ class Controller(object): if (old, new) not in CONTROLLER_TRANSITIONS: raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new)) else: - log.error("State Transition : %s -> %s" %(old, new)) + log.info("State Transition : %s -> %s" %(old, new)) def run(self): self.running = True @@ -364,7 +364,7 @@ class Controller(object): fail_handlers = { } if component in self.topology or self.freeform: - log.error('Component "%s" timed out' % component) + log.warning('Component "%s" timed out' % component) self.tracked.remove(component) fail_handlers.get(component, universal)() diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 03320e09..34dfae89 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -288,7 +288,7 @@ class PerformanceTracker(object): msg = zp.PERF_FRAME(self.to_dict()) self.results_socket.send(msg) else: - log.info(self.to_dict()) + log.debug(self.to_dict()) # if self.trading_environment.max_drawdown: