diff --git a/etc/requirements.txt b/etc/requirements.txt index 236073f7..3163bc7d 100644 --- a/etc/requirements.txt +++ b/etc/requirements.txt @@ -13,3 +13,6 @@ setuptools==0.6c11 # Unix setproctitle==1.1.6 + +# Logging +Logbook==0.3 diff --git a/jenkins_setup.cfg b/jenkins_setup.cfg index a6f24289..3798b1be 100644 --- a/jenkins_setup.cfg +++ b/jenkins_setup.cfg @@ -2,7 +2,7 @@ verbosity=2 detailed-errors=1 -with-xcoverage=1 +#with-xcoverage=1 cover-package=zipline cover-erase=1 cover-html=1 diff --git a/tests/test_optimize.py b/tests/test_optimize.py index 35c1943b..cefafd8d 100644 --- a/tests/test_optimize.py +++ b/tests/test_optimize.py @@ -2,12 +2,9 @@ from unittest2 import TestCase, skip from nose.tools import timed from collections import defaultdict -import logging import numpy as np -from zipline.utils.logger import configure_logging - from zipline.core.devsimulator import AddressAllocator from zipline.optimize.factory import create_predictable_zipline @@ -15,7 +12,8 @@ DEFAULT_TIMEOUT = 15 # seconds EXTENDED_TIMEOUT = 90 allocator = AddressAllocator(1000) -LOGGER = logging.getLogger('ZiplineLogger') + +from logbook.compat import LoggingHandler class TestUpDown(TestCase): """This unittest verifies that the BuySellAlgorithm in @@ -26,14 +24,18 @@ class TestUpDown(TestCase): leased_sockets = defaultdict(list) def setUp(self): - configure_logging() self.zipline_test_config = { - 'allocator':allocator, - 'sid':133, - 'trade_count':5, - 'amplitude':30, - 'base_price':50 + 'allocator' : allocator, + 'sid' : 133, + 'trade_count' : 5, + 'amplitude' : 30, + 'base_price' : 50 } + self.log_handler = LoggingHandler() + self.log_handler.push_application() + + def tearDown(self): + self.log_handler.pop_application() @timed(DEFAULT_TIMEOUT) def test_source_and_orders(self): diff --git a/zipline/components/aggregator.py b/zipline/components/aggregator.py index 5aadb7e8..a7bd6c82 100644 --- a/zipline/components/aggregator.py +++ b/zipline/components/aggregator.py @@ -9,16 +9,30 @@ Abstract base class for Feed and Merge. Feed Merge """ -import logging -from collections import Counter +import logbook import zipline.protocol as zp from zipline.core.component import Component from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \ CONTROL_FRAME, CONTROL_UNFRAME +from zipline.utils.protocol_utils import Enum +from zipline.transitions import WorkflowMeta -LOGGER = logging.getLogger('ZiplineLogger') +log = logbook.Logger('Aggregate') + +# ================= +# State Transitions +# ================= + +INIT, READY, DRAINING = AGGREGATE_STATES = \ +Enum( 'INIT', 'READY', 'DRAINING') + +AGGREGATE_TRANSITIONS = dict( + do_start = (-1 , INIT) , + do_run = (INIT , READY) , + do_drain = (READY , DRAINING) , +) class Aggregate(Component): """ @@ -32,6 +46,9 @@ class Aggregate(Component): Feed and Merge define these differently. """ + abstract = True + __metaclass__ = WorkflowMeta + @property def get_type(self): return COMPONENT_TYPE.CONDUIT @@ -76,22 +93,21 @@ class Aggregate(Component): if len(self.data_buffer) == self.ds_finished_counter: #drain any remaining messages in the buffer - LOGGER.debug("draining feed") + log.debug("Draining Feed") self.drain() self.signal_done() else: try: event = self.unframe(message) - # deserialization error except zp.INVALID_DATASOURCE_FRAME as exc: + # Error deserializing return self.signal_exception(exc) try: self.append(event) self.send_next() - - # Invalid message except zp.INVALID_DATASOURCE_FRAME as exc: + # Invalid message return self.signal_exception(exc) # ------------- @@ -102,7 +118,7 @@ class Aggregate(Component): """ Send all messages in the buffer. """ - self.draining = True + self.state = DRAINING while self.pending_messages() > 0: self.send_next() @@ -114,7 +130,8 @@ class Aggregate(Component): return event = self.next() - if(event != None): + + if event: self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK) self.sent_counters[event.source_id] += 1 self.sent_count += 1 diff --git a/zipline/components/feed.py b/zipline/components/feed.py index 1099f257..f51e515d 100644 --- a/zipline/components/feed.py +++ b/zipline/components/feed.py @@ -1,14 +1,15 @@ -import logging +import logbook from collections import Counter -from zipline.core.component import Component -from zipline.components.aggregator import Aggregate +from zipline.components.aggregator import Aggregate, \ + AGGREGATE_STATES, AGGREGATE_TRANSITIONS import zipline.protocol as zp -from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \ - CONTROL_FRAME, CONTROL_UNFRAME +log = logbook.Logger('Feed') -LOGGER = logging.getLogger('ZiplineLogger') +# ========= +# Component +# ========= class Feed(Aggregate): """ @@ -18,24 +19,31 @@ class Feed(Aggregate): one execution context (thread, process, etc) and run in another. """ + states = list(AGGREGATE_STATES) + transitions = AGGREGATE_TRANSITIONS + initial_state = -1 + def init(self): self.sent_count = 0 self.received_count = 0 - self.draining = False self.ds_finished_counter = 0 - # Depending on the size of this, might want to use a data - # structure with better asymptotics. self.data_buffer = {} # source_id -> integer count self.sent_counters = Counter() self.recv_counters = Counter() + self.state = AGGREGATE_STATES.INIT + @property def get_id(self): return "FEED" + @property + def draining(self): + return self.state == AGGREGATE_STATES.DRAINING + # ------- # Sockets # ------- @@ -71,6 +79,8 @@ class Feed(Aggregate): """ Get the next message in chronological order. """ + + # is_full and draining defined in aggregator if not(self.is_full() or self.draining): return diff --git a/zipline/components/merge.py b/zipline/components/merge.py index 8a2ae7c1..bbb9dc6a 100644 --- a/zipline/components/merge.py +++ b/zipline/components/merge.py @@ -1,6 +1,6 @@ - import zipline.protocol as zp -from zipline.components.aggregator import Aggregate +from zipline.components.aggregator import Aggregate, \ + AGGREGATE_STATES, AGGREGATE_TRANSITIONS from collections import Counter @@ -8,6 +8,11 @@ class Merge(Aggregate): """ Merges multiple streams of events into single messages. """ + + states = list(AGGREGATE_STATES) + transitions = AGGREGATE_TRANSITIONS + initial_state = -1 + def init(self): self.sent_count = 0 self.received_count = 0 diff --git a/zipline/components/passthrough.py b/zipline/components/passthrough.py index 2b176774..e3f2f863 100644 --- a/zipline/components/passthrough.py +++ b/zipline/components/passthrough.py @@ -7,7 +7,7 @@ class PassthroughTransform(BaseTransform): """ def init(self): - self.state = { 'name': 'PASSTHROUGH' } + self.props = { 'name': 'PASSTHROUGH' } #TODO, could save some cycles by skipping the _UNFRAME call # and just setting value to original msg string. diff --git a/zipline/components/tradesimulation.py b/zipline/components/tradesimulation.py index 79312002..f130be0b 100644 --- a/zipline/components/tradesimulation.py +++ b/zipline/components/tradesimulation.py @@ -1,4 +1,4 @@ -import logging +import logbook import datetime import zipline.protocol as zp @@ -8,7 +8,7 @@ from zipline.core.component import Component from zipline.finance.trading import TransactionSimulator from zipline.utils.protocol_utils import ndict -LOGGER = logging.getLogger('ZiplineLogger') +log = logbook.Logger('TradeSimulation') class TradeSimulationClient(Component): @@ -74,7 +74,7 @@ class TradeSimulationClient(Component): self.finish_simulation() def finish_simulation(self): - LOGGER.info("Client is DONE!") + log.info("TradeSimulation is Done") # signal the performance tracker that the simulation has # ended. Perf will internally calculate the full risk report. self.perf.handle_simulation_end() diff --git a/zipline/core/component.py b/zipline/core/component.py index d5b14d96..2d0619ce 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -7,7 +7,7 @@ import sys import uuid import time import socket -import logging +import logbook import traceback import humanhash from setproctitle import setproctitle @@ -23,7 +23,7 @@ from zipline.utils.gpoll import _Poller as GeventPoller from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \ COMPONENT_FAILURE, CONTROL_FRAME -LOGGER = logging.getLogger('ZiplineLogger') +log = logbook.Logger('Component') from zipline.exceptions import ComponentNoInit from zipline.transitions import WorkflowMeta @@ -357,7 +357,7 @@ class Component(object): #notify internal work look that we're done self.done = True # TODO: use state flag - LOGGER.info("[%s] DONE" % self.get_id) + #log.info("[%s] DONE" % self.get_id) # ----------- # Messaging @@ -488,6 +488,29 @@ class Component(object): self.sockets.append(self.sync_socket) + # ----------- + # FSM Actions + # ----------- + + #@property + #def state(self): + #if not hasattr(self, '_state'): + #self._state = self.initial_state + #else: + #return self._state + + #@state.setter + #def state(self, new): + #if not hasattr(self, '_state'): + #self._state = self.initial_state + + #old = self._state + + #if (old, new) in self.workflow: + #self._state = new + #else: + #raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new)) + # --------------------- # Description and Debug # --------------------- diff --git a/zipline/core/host.py b/zipline/core/host.py index 16126465..96ab5c05 100644 --- a/zipline/core/host.py +++ b/zipline/core/host.py @@ -1,4 +1,4 @@ -import logging +import logbook import datetime from component import Component @@ -8,7 +8,7 @@ from zipline.components import Feed, Merge, PassthroughTransform, \ DataSource from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE -LOGGER = logging.getLogger('ZiplineLogger') +log = logbook.Logger('Host') class ComponentHost(Component): """ @@ -89,7 +89,7 @@ class ComponentHost(Component): """ Setup the sync socket and poller. ( Bind ) """ - #LOGGER.debug("Connecting sync server.") + #log.debug("Connecting sync server.") self.sync_socket = self.context.socket(self.zmq.REP) self.sync_socket.bind(self.addresses['sync_address']) @@ -100,11 +100,11 @@ class ComponentHost(Component): self.sockets.append(self.sync_socket) def open(self): - LOGGER.info('== Roll Call ==\n') + log.info('== Roll Call ==') for component in self.components.itervalues(): - LOGGER.info(component) + log.info(component) - LOGGER.info('== End Roll Call ==\n') + log.info('== End Roll Call ==') for component in self.components.itervalues(): self.launch_component(component) @@ -117,7 +117,7 @@ class ComponentHost(Component): """ if len(self.components) == 0: - LOGGER.info("Component register is empty.") + log.info("Component register is empty.") return False return True @@ -138,14 +138,15 @@ class ComponentHost(Component): except ValueError as exc: self.signal_exception(exc) - if status == str(CONTROL_PROTOCOL.DONE): # TODO: other way around - LOGGER.debug("{id} is DONE".format(id=sync_id)) + # TODO: other way around + if status == str(CONTROL_PROTOCOL.DONE): + #log.debug("Component claims done: {id}".format(id=sync_id)) self.unregister_component(sync_id) self.state_flag = COMPONENT_STATE.DONE else: self.sync_register[sync_id] = datetime.datetime.utcnow() - #qutil.LOGGER.info("confirmed {id}".format(id=msg)) + #log.info("confirmed {id}".format(id=msg)) # send synchronization reply self.sync_socket.send('ack', self.zmq.NOBLOCK) diff --git a/zipline/core/monitor.py b/zipline/core/monitor.py index 5b2384f4..bfb13037 100644 --- a/zipline/core/monitor.py +++ b/zipline/core/monitor.py @@ -2,111 +2,18 @@ import zmq import time import gevent import itertools -import logging +import logbook import gevent_zeromq from collections import OrderedDict +from zipline.utils.gpoll import _Poller as GeventPoller from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \ CONTROL_UNFRAME, CONTROL_STATES, INVALID_CONTROL_FRAME \ -states = CONTROL_STATES - -from zipline.utils.gpoll import _Poller as GeventPoller - -# Roll Call ( Discovery ) -# ----------------------- -# -# Controller ( 'foo', 'bar', 'fizz', 'pop' ) -# ------------------ -# | | | | -# +---+ -# | 0 | ? ? ? -# +---+ -# | -# IDENTITY: foo -# get message: PROTOCOL.HEARTBEAT -# reply with PROTOCOL.OK -# -# Controller topology = ( 'foo', 'bar', 'fizz', 'pop' ) -# 'foo' in topology = YES -> -# track 'foo' -# ------------------ -# | | | | -# +---+ -# | 1 | ? ? ? -# +---+ - -# Heartbeating -# ------------ -# -# Controller ( time = 2.717828 ) -# ------------------ -# | | | | -# +---+ +---+ +---+ +---+ -# | 0 | | 0 | | 0 | | 0 | -# +---+ +---+ +---+ +---+ -# | -# IDENTITY: foo -# get message: time = 2.717828 -# reply with [ foo, 2.71828 ] -# -# Controller ( foo.status = OK ) -# ------------------ -# | | | | -# +---+ +---+ +---+ +---+ -# | 1 | | 0 | | 0 | | 0 | -# +---+ +---+ +---+ +---+ -# | -# Controller tracks this node as good -# for this heartbeat - -# Shutdown -# -------- -# -# Controller ( state = RUNNING ) -# ------------------ -# | | | | -# +---+ +---+ +---+ +---+ -# | 1 | | 1 | | 1 | | 1 | -# +---+ +---+ +---+ +---+ -# | -# IDENTITY: foo -# send [ DONE ] - -# Controller ( state = SHUTDOWN ) -# Controller topology.remove('foo') -# ------------------ -# | | | -# +---+ +---+ +---+ +---+ -# | | | 1 | | 1 | | 1 | -# +---+ +---+ +---+ +---+ -# | -# IDENTITY: foo -# yield, stop sending messages - -# Termination -# ------------ -# -# Controller ( state = TERMINATE ) -# ------------------ -# | | | | -# +---+ +---+ +---+ +---+ -# | 1 | | 1 | | 1 | | 1 | -# +---+ +---+ +---+ +---+ -# | -# get message PROTOCOL.KILL - -# Controller ( state = TERMINATE ) -# ------------------ -# | | | | -# +---+ +---+ +---+ +---+ -# | 0 | | 0 | | 0 | | 0 | -# +---+ +---+ +---+ +---+ - INIT, SOURCES_READY, RUNNING, TERMINATE = CONTROL_STATES -state_transitions = frozenset([ +CONTROLLER_TRANSITIONS = frozenset([ (-1 , INIT), (INIT , SOURCES_READY), (SOURCES_READY , RUNNING), @@ -122,6 +29,9 @@ class UnknownChatter(Exception): return """Component calling itself "%s" talking on unexpected channel"""\ % self.named + +log = logbook.Logger('Controller') + class Controller(object): """ A N to M messaging system for inter component communication. @@ -133,9 +43,6 @@ class Controller(object): the individual components. :func message_sender: . - :param logging: Logging interface for tracking broker state - Defaults to None - Topology is the set of components we expect to show up. States are the transitions the sytems go through. The simplest is from RUNNING -> NOT RUNNING . @@ -159,7 +66,7 @@ class Controller(object): debug = False period = 1 - def __init__(self, pub_socket, route_socket, logger = None): + def __init__(self, pub_socket, route_socket): self.context = None self.zmq = None @@ -182,12 +89,6 @@ class Controller(object): self.error_replay = OrderedDict() - if logger: - self.logging = logger - else: - default_logger = logging.getLogger('ZiplineLogger') - self.logging = default_logger - def init_zmq(self, flavor): assert self.zmq_flavor in ['thread', 'mp', 'green'] @@ -239,10 +140,10 @@ class Controller(object): def state(self, new): old, self._state = self._state, new - if (old, new) not in state_transitions: - raise RuntimeError("[Controller] Invalid State Transition : %s -> %s" %(old, new)) + if (old, new) not in CONTROLLER_TRANSITIONS: + raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new)) else: - self.logging.info("[Controller] State Transition : %s -> %s" %(old, new)) + log.info("State Transition : %s -> %s" %(old, new)) def run(self): self.running = True @@ -251,13 +152,13 @@ class Controller(object): try: return self._poll() # use a python loop except KeyboardInterrupt: - self.logging.info('Shutdown event loop') + log.debug('Shutdown event loop') def log_status(self): """ Snapshot of the tracked components at every period. """ - #self.logging.info("[Controller] Tracking : %s" % ([c for c in self.tracked],)) + #log.info("Tracking component : %s" % ([c for c in self.tracked],)) pass def replay_errors(self): @@ -366,11 +267,11 @@ class Controller(object): self.handle_recv(buffer[:]) buffer = [] except INVALID_CONTROL_FRAME: - self.logging.error('Invalid frame', rawmessage) + log.error('Invalid frame', rawmessage) pass if socks.get(self.cancel) == self.zmq.POLLIN: - self.logging.info('[Controller] Received Cancellation') + log.info('Received Cancellation') rawmessage = self.cancel.recv() self.cancel.send('') self.shutdown(soft=True) @@ -430,7 +331,7 @@ class Controller(object): if self.state is CONTROL_STATES.TERMINATE: return - self.logging.info('[Controller] Now Tracking "%s" ' % component) + log.info(' Now Tracking "%s" ' % component) universal = self.new_universal init_handlers = { @@ -452,7 +353,7 @@ class Controller(object): def fail_universal(self): pass # TODO: this requires higher order functionality - #self.logging.error('[Controller] System in exception state, shutting down') + #log.error('System in exception state, shutting down') #self.shutdown(soft=True) def fail(self, component): @@ -463,7 +364,7 @@ class Controller(object): fail_handlers = { } if component in self.topology or self.freeform: - self.logging.info('[Controller] Component "%s" timed out' % component) + log.warning('Component "%s" timed out' % component) self.tracked.remove(component) fail_handlers.get(component, universal)() @@ -472,7 +373,7 @@ class Controller(object): # ------------------- def done(self, component): - self.logging.info('[Controller] Component "%s" done.' % component) + log.info('Component "%s" signaled done.' % component) # -------------- # Error Handling @@ -482,7 +383,7 @@ class Controller(object): """ Shutdown the system on failure. """ - self.logging.error('[Controller] System in exception state, shutting down') + log.error('System in exception state, shutting down') self.shutdown(soft=True) def exception(self, component, failure): @@ -491,7 +392,7 @@ class Controller(object): if component in self.topology or self.freeform: self.error_replay[(component, time.time())] = failure - self.logging.error('[Controller] Component "%s" in exception state' % component) + log.error('Component in exception state: %s' % component) exception_handlers.get(component, universal)() else: @@ -517,8 +418,9 @@ class Controller(object): self.responses.add(identity) else: # Otherwise its something weird and we don't know - # what to do so just say so - self.logging.error("Weird stuff happened: %s" % msg) + # what to do so just say so, probably line noise + # from ZeroMQ + log.error("Weird stuff happened: %s" % msg) # A component is telling us it failed, and how if id is CONTROL_PROTOCOL.EXCEPTION: @@ -572,11 +474,14 @@ class Controller(object): def do_error_replay(self): for (component, time), error in self.error_replay.iteritems(): - self.logging.info('[Controller] Error Log for -- %s --:\n%s' % + log.info('Error Log for -- %s --:\n%s' % (component, error)) def shutdown(self, hard=False, soft=True, context=None): + if self.state is CONTROL_STATES.TERMINATE: + return + if not self.polling: return @@ -587,7 +492,7 @@ class Controller(object): if hard: self.state = CONTROL_STATES.TERMINATE - self.logging.info('[Controller] Hard Shutdown') + log.info('Hard Shutdown') #for asoc in self.associated: #asoc.close() @@ -595,7 +500,7 @@ class Controller(object): if soft: self.state = CONTROL_STATES.TERMINATE - self.logging.info('[Controller] Soft Shutdown') + log.info('Soft Shutdown') self.send_softkill() #for asoc in self.associated: diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 5a2aeaef..34dfae89 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -59,11 +59,20 @@ Position Tracking +-----------------+----------------------------------------------------+ | last_sale_price | price at last sale of the security on the exchange | +-----------------+----------------------------------------------------+ + | cost_basis | the volume weighted average price paid per share | + +-----------------+----------------------------------------------------+ + Performance Period ================== +Performance Periods are updated with every trade. When calling +code needs a portfolio object that fulfills the algorithm +protocol, use the PerformancePeriod.as_portfolio method. See that +method for comments on the specific fields provided (and +omitted). + +---------------+------------------------------------------------------+ | key | value | +===============+======================================================+ @@ -111,7 +120,7 @@ Performance Period """ -import logging +import logbook import datetime import pytz import math @@ -121,7 +130,7 @@ import zmq import zipline.protocol as zp import zipline.finance.risk as risk -LOGGER = logging.getLogger('ZiplineLogger') +log = logbook.Logger('Performance') class PerformanceTracker(object): """ @@ -190,7 +199,7 @@ class PerformanceTracker(object): ) def get_portfolio(self): - return self.cumulative_performance.to_ndict() + return self.cumulative_performance.as_portfolio() def open(self, context): if self.results_addr: @@ -198,7 +207,7 @@ class PerformanceTracker(object): sock.connect(self.results_addr) self.results_socket = sock else: - LOGGER.warn("Not streaming results because no results socket given") + log.warn("Not streaming results because no results socket given") def publish_to(self, results_addr): """ @@ -247,12 +256,12 @@ class PerformanceTracker(object): self.cumulative_performance.update_last_sale(event) self.todays_performance.update_last_sale(event) - - def handle_market_close(self): #calculate performance as of last trade self.cumulative_performance.calculate_performance() self.todays_performance.calculate_performance() + def handle_market_close(self): + # add the return results from today to the list of DailyReturn objects. todays_date = self.market_close.replace(hour=0, minute=0, second=0) todays_return_obj = risk.DailyReturn( @@ -278,14 +287,16 @@ class PerformanceTracker(object): if self.results_socket: msg = zp.PERF_FRAME(self.to_dict()) self.results_socket.send(msg) + else: + log.debug(self.to_dict()) # if self.trading_environment.max_drawdown: returns = self.todays_performance.returns max_dd = -1 * self.trading_environment.max_drawdown if returns < max_dd: - LOGGER.info(str(returns) + " broke through " + str(max_dd)) - LOGGER.info("Exceeded max drawdown.") + log.info(str(returns) + " broke through " + str(max_dd)) + log.info("Exceeded max drawdown.") # mark the perf period with max loss flag, # so it shows up in the update, but don't end the test # here. Let the update go out before stopping @@ -320,8 +331,8 @@ class PerformanceTracker(object): """ log_msg = "Simulated {n} trading days out of {m}." - LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days)) - LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open)) + log.info(log_msg.format(n=self.day_count, m=self.total_days)) + log.info("first open: {d}".format(d=self.trading_environment.first_open)) # the stream will end on the last trading day, but will not trigger # an end of day, so we trigger the final market close here. @@ -336,7 +347,7 @@ class PerformanceTracker(object): ) if self.results_socket: - LOGGER.info("about to stream the risk report...") + log.info("about to stream the risk report...") risk_dict = self.risk_report.to_dict() msg = zp.RISK_FRAME(risk_dict) @@ -490,14 +501,7 @@ class PerformancePeriod(object): self.positions[event.sid].last_sale_price = event.price self.positions[event.sid].last_sale_date = event.dt - def to_dict(self): - """ - Creates a dictionary representing the state of this performance - period. See header comments for a detailed description. - """ - positions = self.get_positions_list() - transactions = [x.as_dict() for x in self.processed_transactions] - + def __core_dict(self): rval = { 'ending_value' : self.ending_value, 'capital_used' : self.period_capital_used, @@ -508,46 +512,69 @@ class PerformancePeriod(object): 'cumulative_capital_used' : self.cumulative_capital_used, 'max_capital_used' : self.max_capital_used, 'max_leverage' : self.max_leverage, - 'positions' : positions, 'pnl' : self.pnl, 'returns' : self.returns, - 'transactions' : transactions, 'period_open' : self.period_open, 'period_close' : self.period_close } + return rval + + + def to_dict(self): + """ + Creates a dictionary representing the state of this performance + period. See header comments for a detailed description. + """ + rval = self.__core_dict() + positions = self.get_positions_list() + rval['positions'] = positions + # we want the key to be absent, not just empty - if not self.keep_transactions: - del rval['transactions'] + if self.keep_transactions: + transactions = [x.as_dict() for x in self.processed_transactions] + rval['transactions'] = transactions return rval - def to_ndict(self): + def as_portfolio(self): """ - Creates a ndict representing the state of this perfomance period. - Properties are the same as the results of to_dict. See header comments - for a detailed description. - + The purpose of this method is to provide a portfolio + object to algorithms running inside the same trading + client. The data needed is captured raw in a + PerformancePeriod, and in this method we rename some + fields for usability and remove extraneous fields. """ - positions = self.get_positions(ndicted=True) + portfolio = self.__core_dict() + # rename: + # ending_cash -> cash + # period_open -> backtest_start + # + # remove: + # period_close, starting_value, + # cumulative_capital_used, max_leverage, max_capital_used + portfolio['cash'] = portfolio['ending_cash'] + portfolio['start_date'] = portfolio['period_open'] + portfolio['position_value'] = portfolio['ending_value'] - positions = zp.ndict(positions) + del(portfolio['ending_cash']) + del(portfolio['period_open']) + del(portfolio['period_close']) + del(portfolio['starting_value']) + del(portfolio['ending_value']) + del(portfolio['cumulative_capital_used']) + del(portfolio['max_leverage']) + del(portfolio['max_capital_used']) - return zp.ndict({ - 'ending_value' : self.ending_value, - 'capital_used' : self.period_capital_used, - 'starting_value' : self.starting_value, - 'starting_cash' : self.starting_cash, - 'ending_cash' : self.ending_cash, - 'cumulative_capital_used' : self.cumulative_capital_used, - 'max_capital_used' : self.max_capital_used, - 'max_leverage' : self.max_leverage, - 'positions' : positions, - 'transactions' : self.processed_transactions - }) + portfolio['positions'] = self.get_positions(ndicted=True) + return zp.ndict(portfolio) def get_positions(self, ndicted=False): - positions = {} + if ndicted: + positions = zp.ndict({}) + else: + positions = {} + for sid, pos in self.positions.iteritems(): cur = pos.to_dict() if ndicted: diff --git a/zipline/finance/risk.py b/zipline/finance/risk.py index f4bd736f..a63cf872 100644 --- a/zipline/finance/risk.py +++ b/zipline/finance/risk.py @@ -27,7 +27,7 @@ Risk Report | alpha | The _algorithm_ alpha to the benchmark. | +-----------------+----------------------------------------------------+ | excess_return | The excess return of the algorithm over the | - | | benchmark. | + | | treasuries. | +-----------------+----------------------------------------------------+ | max_drawdown | The largest relative peak to relative trough move | | | for the portfolio returns between self.start_date | @@ -187,6 +187,8 @@ class RiskMetrics(): return period_returns, returns def calculate_volatility(self, daily_returns): + # TODO: we should be using an annualized number for the + # square root, not the days in the period. return np.std(daily_returns, ddof=1) * math.sqrt(self.trading_days) def calculate_sharpe(self): diff --git a/zipline/finance/vwap.py b/zipline/finance/vwap.py index 35eb9578..09941fe1 100644 --- a/zipline/finance/vwap.py +++ b/zipline/finance/vwap.py @@ -7,20 +7,20 @@ from zipline.finance.movingaverage import EventWindow class VWAPTransform(BaseTransform): def init(self, name, daycount=3): - self.state = {} - self.state['name'] = name + self.props = {} + self.props['name'] = name self.daycount = daycount self.by_sid = defaultdict(self.create_vwap) @property def get_id(self): - return self.state['name'] + return self.props['name'] def transform(self, event): cur = self.by_sid[event.sid] cur.update(event) - self.state['value'] = cur.vwap - return self.state + self.props['value'] = cur.vwap + return self.props def create_vwap(self): return DailyVWAP(self.daycount) diff --git a/zipline/lines.py b/zipline/lines.py index 070931c5..a229c9f0 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -60,8 +60,6 @@ before invoking simulate. +---------------------------------+ """ -import logging - import zipline.utils.factory as factory from zipline.components import DataSource @@ -73,8 +71,6 @@ from zipline.core.devsimulator import Simulator from zipline.core.monitor import Controller from zipline.finance.trading import SIMULATION_STYLE -LOGGER = logging.getLogger('ZiplineLogger') - class SimulatedTrading(object): """ Zipline with:: @@ -133,7 +129,6 @@ class SimulatedTrading(object): self.con = Controller( sockets[6], sockets[7], - logger = LOGGER ) self.con.cancel_socket = self.allocator.lease(1)[0] diff --git a/zipline/transforms/base.py b/zipline/transforms/base.py index 082e52b9..2c8ab856 100644 --- a/zipline/transforms/base.py +++ b/zipline/transforms/base.py @@ -25,7 +25,7 @@ class BaseTransform(Component): @property def get_id(self): - return self.state['name'] + return self.props['name'] @property def get_type(self): @@ -116,9 +116,9 @@ class BaseTransform(Component): Transforms run in parallel and results are merged into a single map, so transform names must be unique. Best practice - is to use the self.state object initialized from the transform + is to use the self.props object initialized from the transform configuration, and only set the transformed value:: - self.state['value'] = transformed_value + self.props['value'] = transformed_value """ raise NotImplementedError diff --git a/zipline/transitions.py b/zipline/transitions.py index 495a32d4..e47d54c8 100644 --- a/zipline/transitions.py +++ b/zipline/transitions.py @@ -39,24 +39,6 @@ class WorkflowMeta(type): """ Base metaclass component workflows. """ - @property - def state(self): - if not hasattr(self, '_state'): - self._state = self.initial_state - else: - return self._state - - @state.setter - def state(self, new): - if not hasattr(self, '_state'): - self._state = self.initial_state - - old = self._state - - if (old, new) in self.workflow: - self._state = new - else: - raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new)) def __new__(cls, name, mro, attrs): base = 'Component' @@ -71,7 +53,6 @@ class WorkflowMeta(type): raise RuntimeError('`workflow` is a reserved attribute.') if not state: - import pdb; pdb.set_trace() raise RuntimeError('Must specify states') if not transitions: diff --git a/zipline/utils/logger.py b/zipline/utils/logger.py index c3b97cda..b044642f 100644 --- a/zipline/utils/logger.py +++ b/zipline/utils/logger.py @@ -3,6 +3,7 @@ Small classes to assist with timezone calculations, LOGGER configuration, and other common operations. """ +# DEPRECATED DO NOT USE import logging import logging.config