mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-02 12:12:36 +08:00
Merge branch 'master' of github.com:quantopian/zipline
This commit is contained in:
@@ -13,3 +13,6 @@ setuptools==0.6c11
|
||||
|
||||
# Unix
|
||||
setproctitle==1.1.6
|
||||
|
||||
# Logging
|
||||
Logbook==0.3
|
||||
|
||||
+1
-1
@@ -2,7 +2,7 @@
|
||||
verbosity=2
|
||||
detailed-errors=1
|
||||
|
||||
with-xcoverage=1
|
||||
#with-xcoverage=1
|
||||
cover-package=zipline
|
||||
cover-erase=1
|
||||
cover-html=1
|
||||
|
||||
+12
-10
@@ -2,12 +2,9 @@
|
||||
from unittest2 import TestCase, skip
|
||||
from nose.tools import timed
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
|
||||
import numpy as np
|
||||
|
||||
from zipline.utils.logger import configure_logging
|
||||
|
||||
from zipline.core.devsimulator import AddressAllocator
|
||||
from zipline.optimize.factory import create_predictable_zipline
|
||||
|
||||
@@ -15,7 +12,8 @@ DEFAULT_TIMEOUT = 15 # seconds
|
||||
EXTENDED_TIMEOUT = 90
|
||||
|
||||
allocator = AddressAllocator(1000)
|
||||
LOGGER = logging.getLogger('ZiplineLogger')
|
||||
|
||||
from logbook.compat import LoggingHandler
|
||||
|
||||
class TestUpDown(TestCase):
|
||||
"""This unittest verifies that the BuySellAlgorithm in
|
||||
@@ -26,14 +24,18 @@ class TestUpDown(TestCase):
|
||||
leased_sockets = defaultdict(list)
|
||||
|
||||
def setUp(self):
|
||||
configure_logging()
|
||||
self.zipline_test_config = {
|
||||
'allocator':allocator,
|
||||
'sid':133,
|
||||
'trade_count':5,
|
||||
'amplitude':30,
|
||||
'base_price':50
|
||||
'allocator' : allocator,
|
||||
'sid' : 133,
|
||||
'trade_count' : 5,
|
||||
'amplitude' : 30,
|
||||
'base_price' : 50
|
||||
}
|
||||
self.log_handler = LoggingHandler()
|
||||
self.log_handler.push_application()
|
||||
|
||||
def tearDown(self):
|
||||
self.log_handler.pop_application()
|
||||
|
||||
@timed(DEFAULT_TIMEOUT)
|
||||
def test_source_and_orders(self):
|
||||
|
||||
@@ -9,16 +9,30 @@ Abstract base class for Feed and Merge.
|
||||
Feed Merge
|
||||
|
||||
"""
|
||||
import logging
|
||||
from collections import Counter
|
||||
import logbook
|
||||
|
||||
import zipline.protocol as zp
|
||||
|
||||
from zipline.core.component import Component
|
||||
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \
|
||||
CONTROL_FRAME, CONTROL_UNFRAME
|
||||
from zipline.utils.protocol_utils import Enum
|
||||
from zipline.transitions import WorkflowMeta
|
||||
|
||||
LOGGER = logging.getLogger('ZiplineLogger')
|
||||
log = logbook.Logger('Aggregate')
|
||||
|
||||
# =================
|
||||
# State Transitions
|
||||
# =================
|
||||
|
||||
INIT, READY, DRAINING = AGGREGATE_STATES = \
|
||||
Enum( 'INIT', 'READY', 'DRAINING')
|
||||
|
||||
AGGREGATE_TRANSITIONS = dict(
|
||||
do_start = (-1 , INIT) ,
|
||||
do_run = (INIT , READY) ,
|
||||
do_drain = (READY , DRAINING) ,
|
||||
)
|
||||
|
||||
class Aggregate(Component):
|
||||
"""
|
||||
@@ -32,6 +46,9 @@ class Aggregate(Component):
|
||||
Feed and Merge define these differently.
|
||||
"""
|
||||
|
||||
abstract = True
|
||||
__metaclass__ = WorkflowMeta
|
||||
|
||||
@property
|
||||
def get_type(self):
|
||||
return COMPONENT_TYPE.CONDUIT
|
||||
@@ -76,22 +93,21 @@ class Aggregate(Component):
|
||||
|
||||
if len(self.data_buffer) == self.ds_finished_counter:
|
||||
#drain any remaining messages in the buffer
|
||||
LOGGER.debug("draining feed")
|
||||
log.debug("Draining Feed")
|
||||
self.drain()
|
||||
self.signal_done()
|
||||
else:
|
||||
try:
|
||||
event = self.unframe(message)
|
||||
# deserialization error
|
||||
except zp.INVALID_DATASOURCE_FRAME as exc:
|
||||
# Error deserializing
|
||||
return self.signal_exception(exc)
|
||||
|
||||
try:
|
||||
self.append(event)
|
||||
self.send_next()
|
||||
|
||||
# Invalid message
|
||||
except zp.INVALID_DATASOURCE_FRAME as exc:
|
||||
# Invalid message
|
||||
return self.signal_exception(exc)
|
||||
|
||||
# -------------
|
||||
@@ -102,7 +118,7 @@ class Aggregate(Component):
|
||||
"""
|
||||
Send all messages in the buffer.
|
||||
"""
|
||||
self.draining = True
|
||||
self.state = DRAINING
|
||||
while self.pending_messages() > 0:
|
||||
self.send_next()
|
||||
|
||||
@@ -114,7 +130,8 @@ class Aggregate(Component):
|
||||
return
|
||||
|
||||
event = self.next()
|
||||
if(event != None):
|
||||
|
||||
if event:
|
||||
self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK)
|
||||
self.sent_counters[event.source_id] += 1
|
||||
self.sent_count += 1
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
import logging
|
||||
import logbook
|
||||
from collections import Counter
|
||||
|
||||
from zipline.core.component import Component
|
||||
from zipline.components.aggregator import Aggregate
|
||||
from zipline.components.aggregator import Aggregate, \
|
||||
AGGREGATE_STATES, AGGREGATE_TRANSITIONS
|
||||
import zipline.protocol as zp
|
||||
|
||||
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \
|
||||
CONTROL_FRAME, CONTROL_UNFRAME
|
||||
log = logbook.Logger('Feed')
|
||||
|
||||
LOGGER = logging.getLogger('ZiplineLogger')
|
||||
# =========
|
||||
# Component
|
||||
# =========
|
||||
|
||||
class Feed(Aggregate):
|
||||
"""
|
||||
@@ -18,24 +19,31 @@ class Feed(Aggregate):
|
||||
one execution context (thread, process, etc) and run in another.
|
||||
"""
|
||||
|
||||
states = list(AGGREGATE_STATES)
|
||||
transitions = AGGREGATE_TRANSITIONS
|
||||
initial_state = -1
|
||||
|
||||
def init(self):
|
||||
self.sent_count = 0
|
||||
self.received_count = 0
|
||||
self.draining = False
|
||||
self.ds_finished_counter = 0
|
||||
|
||||
# Depending on the size of this, might want to use a data
|
||||
# structure with better asymptotics.
|
||||
self.data_buffer = {}
|
||||
|
||||
# source_id -> integer count
|
||||
self.sent_counters = Counter()
|
||||
self.recv_counters = Counter()
|
||||
|
||||
self.state = AGGREGATE_STATES.INIT
|
||||
|
||||
@property
|
||||
def get_id(self):
|
||||
return "FEED"
|
||||
|
||||
@property
|
||||
def draining(self):
|
||||
return self.state == AGGREGATE_STATES.DRAINING
|
||||
|
||||
# -------
|
||||
# Sockets
|
||||
# -------
|
||||
@@ -71,6 +79,8 @@ class Feed(Aggregate):
|
||||
"""
|
||||
Get the next message in chronological order.
|
||||
"""
|
||||
|
||||
# is_full and draining defined in aggregator
|
||||
if not(self.is_full() or self.draining):
|
||||
return
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
|
||||
import zipline.protocol as zp
|
||||
from zipline.components.aggregator import Aggregate
|
||||
from zipline.components.aggregator import Aggregate, \
|
||||
AGGREGATE_STATES, AGGREGATE_TRANSITIONS
|
||||
|
||||
from collections import Counter
|
||||
|
||||
@@ -8,6 +8,11 @@ class Merge(Aggregate):
|
||||
"""
|
||||
Merges multiple streams of events into single messages.
|
||||
"""
|
||||
|
||||
states = list(AGGREGATE_STATES)
|
||||
transitions = AGGREGATE_TRANSITIONS
|
||||
initial_state = -1
|
||||
|
||||
def init(self):
|
||||
self.sent_count = 0
|
||||
self.received_count = 0
|
||||
|
||||
@@ -7,7 +7,7 @@ class PassthroughTransform(BaseTransform):
|
||||
"""
|
||||
|
||||
def init(self):
|
||||
self.state = { 'name': 'PASSTHROUGH' }
|
||||
self.props = { 'name': 'PASSTHROUGH' }
|
||||
|
||||
#TODO, could save some cycles by skipping the _UNFRAME call
|
||||
# and just setting value to original msg string.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import logging
|
||||
import logbook
|
||||
import datetime
|
||||
|
||||
import zipline.protocol as zp
|
||||
@@ -8,7 +8,7 @@ from zipline.core.component import Component
|
||||
from zipline.finance.trading import TransactionSimulator
|
||||
from zipline.utils.protocol_utils import ndict
|
||||
|
||||
LOGGER = logging.getLogger('ZiplineLogger')
|
||||
log = logbook.Logger('TradeSimulation')
|
||||
|
||||
class TradeSimulationClient(Component):
|
||||
|
||||
@@ -74,7 +74,7 @@ class TradeSimulationClient(Component):
|
||||
self.finish_simulation()
|
||||
|
||||
def finish_simulation(self):
|
||||
LOGGER.info("Client is DONE!")
|
||||
log.info("TradeSimulation is Done")
|
||||
# signal the performance tracker that the simulation has
|
||||
# ended. Perf will internally calculate the full risk report.
|
||||
self.perf.handle_simulation_end()
|
||||
|
||||
@@ -7,7 +7,7 @@ import sys
|
||||
import uuid
|
||||
import time
|
||||
import socket
|
||||
import logging
|
||||
import logbook
|
||||
import traceback
|
||||
import humanhash
|
||||
from setproctitle import setproctitle
|
||||
@@ -23,7 +23,7 @@ from zipline.utils.gpoll import _Poller as GeventPoller
|
||||
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \
|
||||
COMPONENT_FAILURE, CONTROL_FRAME
|
||||
|
||||
LOGGER = logging.getLogger('ZiplineLogger')
|
||||
log = logbook.Logger('Component')
|
||||
|
||||
from zipline.exceptions import ComponentNoInit
|
||||
from zipline.transitions import WorkflowMeta
|
||||
@@ -357,7 +357,7 @@ class Component(object):
|
||||
#notify internal work look that we're done
|
||||
self.done = True # TODO: use state flag
|
||||
|
||||
LOGGER.info("[%s] DONE" % self.get_id)
|
||||
#log.info("[%s] DONE" % self.get_id)
|
||||
|
||||
# -----------
|
||||
# Messaging
|
||||
@@ -488,6 +488,29 @@ class Component(object):
|
||||
|
||||
self.sockets.append(self.sync_socket)
|
||||
|
||||
# -----------
|
||||
# FSM Actions
|
||||
# -----------
|
||||
|
||||
#@property
|
||||
#def state(self):
|
||||
#if not hasattr(self, '_state'):
|
||||
#self._state = self.initial_state
|
||||
#else:
|
||||
#return self._state
|
||||
|
||||
#@state.setter
|
||||
#def state(self, new):
|
||||
#if not hasattr(self, '_state'):
|
||||
#self._state = self.initial_state
|
||||
|
||||
#old = self._state
|
||||
|
||||
#if (old, new) in self.workflow:
|
||||
#self._state = new
|
||||
#else:
|
||||
#raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new))
|
||||
|
||||
# ---------------------
|
||||
# Description and Debug
|
||||
# ---------------------
|
||||
|
||||
+11
-10
@@ -1,4 +1,4 @@
|
||||
import logging
|
||||
import logbook
|
||||
import datetime
|
||||
|
||||
from component import Component
|
||||
@@ -8,7 +8,7 @@ from zipline.components import Feed, Merge, PassthroughTransform, \
|
||||
DataSource
|
||||
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE
|
||||
|
||||
LOGGER = logging.getLogger('ZiplineLogger')
|
||||
log = logbook.Logger('Host')
|
||||
|
||||
class ComponentHost(Component):
|
||||
"""
|
||||
@@ -89,7 +89,7 @@ class ComponentHost(Component):
|
||||
"""
|
||||
Setup the sync socket and poller. ( Bind )
|
||||
"""
|
||||
#LOGGER.debug("Connecting sync server.")
|
||||
#log.debug("Connecting sync server.")
|
||||
|
||||
self.sync_socket = self.context.socket(self.zmq.REP)
|
||||
self.sync_socket.bind(self.addresses['sync_address'])
|
||||
@@ -100,11 +100,11 @@ class ComponentHost(Component):
|
||||
self.sockets.append(self.sync_socket)
|
||||
|
||||
def open(self):
|
||||
LOGGER.info('== Roll Call ==\n')
|
||||
log.info('== Roll Call ==')
|
||||
for component in self.components.itervalues():
|
||||
LOGGER.info(component)
|
||||
log.info(component)
|
||||
|
||||
LOGGER.info('== End Roll Call ==\n')
|
||||
log.info('== End Roll Call ==')
|
||||
|
||||
for component in self.components.itervalues():
|
||||
self.launch_component(component)
|
||||
@@ -117,7 +117,7 @@ class ComponentHost(Component):
|
||||
"""
|
||||
|
||||
if len(self.components) == 0:
|
||||
LOGGER.info("Component register is empty.")
|
||||
log.info("Component register is empty.")
|
||||
return False
|
||||
|
||||
return True
|
||||
@@ -138,14 +138,15 @@ class ComponentHost(Component):
|
||||
except ValueError as exc:
|
||||
self.signal_exception(exc)
|
||||
|
||||
if status == str(CONTROL_PROTOCOL.DONE): # TODO: other way around
|
||||
LOGGER.debug("{id} is DONE".format(id=sync_id))
|
||||
# TODO: other way around
|
||||
if status == str(CONTROL_PROTOCOL.DONE):
|
||||
#log.debug("Component claims done: {id}".format(id=sync_id))
|
||||
self.unregister_component(sync_id)
|
||||
self.state_flag = COMPONENT_STATE.DONE
|
||||
else:
|
||||
self.sync_register[sync_id] = datetime.datetime.utcnow()
|
||||
|
||||
#qutil.LOGGER.info("confirmed {id}".format(id=msg))
|
||||
#log.info("confirmed {id}".format(id=msg))
|
||||
# send synchronization reply
|
||||
self.sync_socket.send('ack', self.zmq.NOBLOCK)
|
||||
|
||||
|
||||
+29
-124
@@ -2,111 +2,18 @@ import zmq
|
||||
import time
|
||||
import gevent
|
||||
import itertools
|
||||
import logging
|
||||
import logbook
|
||||
import gevent_zeromq
|
||||
|
||||
from collections import OrderedDict
|
||||
|
||||
from zipline.utils.gpoll import _Poller as GeventPoller
|
||||
from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \
|
||||
CONTROL_UNFRAME, CONTROL_STATES, INVALID_CONTROL_FRAME \
|
||||
|
||||
states = CONTROL_STATES
|
||||
|
||||
from zipline.utils.gpoll import _Poller as GeventPoller
|
||||
|
||||
# Roll Call ( Discovery )
|
||||
# -----------------------
|
||||
#
|
||||
# Controller ( 'foo', 'bar', 'fizz', 'pop' )
|
||||
# ------------------
|
||||
# | | | |
|
||||
# +---+
|
||||
# | 0 | ? ? ?
|
||||
# +---+
|
||||
# |
|
||||
# IDENTITY: foo
|
||||
# get message: PROTOCOL.HEARTBEAT
|
||||
# reply with PROTOCOL.OK
|
||||
#
|
||||
# Controller topology = ( 'foo', 'bar', 'fizz', 'pop' )
|
||||
# 'foo' in topology = YES ->
|
||||
# track 'foo'
|
||||
# ------------------
|
||||
# | | | |
|
||||
# +---+
|
||||
# | 1 | ? ? ?
|
||||
# +---+
|
||||
|
||||
# Heartbeating
|
||||
# ------------
|
||||
#
|
||||
# Controller ( time = 2.717828 )
|
||||
# ------------------
|
||||
# | | | |
|
||||
# +---+ +---+ +---+ +---+
|
||||
# | 0 | | 0 | | 0 | | 0 |
|
||||
# +---+ +---+ +---+ +---+
|
||||
# |
|
||||
# IDENTITY: foo
|
||||
# get message: time = 2.717828
|
||||
# reply with [ foo, 2.71828 ]
|
||||
#
|
||||
# Controller ( foo.status = OK )
|
||||
# ------------------
|
||||
# | | | |
|
||||
# +---+ +---+ +---+ +---+
|
||||
# | 1 | | 0 | | 0 | | 0 |
|
||||
# +---+ +---+ +---+ +---+
|
||||
# |
|
||||
# Controller tracks this node as good
|
||||
# for this heartbeat
|
||||
|
||||
# Shutdown
|
||||
# --------
|
||||
#
|
||||
# Controller ( state = RUNNING )
|
||||
# ------------------
|
||||
# | | | |
|
||||
# +---+ +---+ +---+ +---+
|
||||
# | 1 | | 1 | | 1 | | 1 |
|
||||
# +---+ +---+ +---+ +---+
|
||||
# |
|
||||
# IDENTITY: foo
|
||||
# send [ DONE ]
|
||||
|
||||
# Controller ( state = SHUTDOWN )
|
||||
# Controller topology.remove('foo')
|
||||
# ------------------
|
||||
# | | |
|
||||
# +---+ +---+ +---+ +---+
|
||||
# | | | 1 | | 1 | | 1 |
|
||||
# +---+ +---+ +---+ +---+
|
||||
# |
|
||||
# IDENTITY: foo
|
||||
# yield, stop sending messages
|
||||
|
||||
# Termination
|
||||
# ------------
|
||||
#
|
||||
# Controller ( state = TERMINATE )
|
||||
# ------------------
|
||||
# | | | |
|
||||
# +---+ +---+ +---+ +---+
|
||||
# | 1 | | 1 | | 1 | | 1 |
|
||||
# +---+ +---+ +---+ +---+
|
||||
# |
|
||||
# get message PROTOCOL.KILL
|
||||
|
||||
# Controller ( state = TERMINATE )
|
||||
# ------------------
|
||||
# | | | |
|
||||
# +---+ +---+ +---+ +---+
|
||||
# | 0 | | 0 | | 0 | | 0 |
|
||||
# +---+ +---+ +---+ +---+
|
||||
|
||||
INIT, SOURCES_READY, RUNNING, TERMINATE = CONTROL_STATES
|
||||
|
||||
state_transitions = frozenset([
|
||||
CONTROLLER_TRANSITIONS = frozenset([
|
||||
(-1 , INIT),
|
||||
(INIT , SOURCES_READY),
|
||||
(SOURCES_READY , RUNNING),
|
||||
@@ -122,6 +29,9 @@ class UnknownChatter(Exception):
|
||||
return """Component calling itself "%s" talking on unexpected channel"""\
|
||||
% self.named
|
||||
|
||||
|
||||
log = logbook.Logger('Controller')
|
||||
|
||||
class Controller(object):
|
||||
"""
|
||||
A N to M messaging system for inter component communication.
|
||||
@@ -133,9 +43,6 @@ class Controller(object):
|
||||
the individual components.
|
||||
:func message_sender: .
|
||||
|
||||
:param logging: Logging interface for tracking broker state
|
||||
Defaults to None
|
||||
|
||||
Topology is the set of components we expect to show up.
|
||||
States are the transitions the sytems go through. The
|
||||
simplest is from RUNNING -> NOT RUNNING .
|
||||
@@ -159,7 +66,7 @@ class Controller(object):
|
||||
debug = False
|
||||
period = 1
|
||||
|
||||
def __init__(self, pub_socket, route_socket, logger = None):
|
||||
def __init__(self, pub_socket, route_socket):
|
||||
|
||||
self.context = None
|
||||
self.zmq = None
|
||||
@@ -182,12 +89,6 @@ class Controller(object):
|
||||
|
||||
self.error_replay = OrderedDict()
|
||||
|
||||
if logger:
|
||||
self.logging = logger
|
||||
else:
|
||||
default_logger = logging.getLogger('ZiplineLogger')
|
||||
self.logging = default_logger
|
||||
|
||||
def init_zmq(self, flavor):
|
||||
|
||||
assert self.zmq_flavor in ['thread', 'mp', 'green']
|
||||
@@ -239,10 +140,10 @@ class Controller(object):
|
||||
def state(self, new):
|
||||
old, self._state = self._state, new
|
||||
|
||||
if (old, new) not in state_transitions:
|
||||
raise RuntimeError("[Controller] Invalid State Transition : %s -> %s" %(old, new))
|
||||
if (old, new) not in CONTROLLER_TRANSITIONS:
|
||||
raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new))
|
||||
else:
|
||||
self.logging.info("[Controller] State Transition : %s -> %s" %(old, new))
|
||||
log.info("State Transition : %s -> %s" %(old, new))
|
||||
|
||||
def run(self):
|
||||
self.running = True
|
||||
@@ -251,13 +152,13 @@ class Controller(object):
|
||||
try:
|
||||
return self._poll() # use a python loop
|
||||
except KeyboardInterrupt:
|
||||
self.logging.info('Shutdown event loop')
|
||||
log.debug('Shutdown event loop')
|
||||
|
||||
def log_status(self):
|
||||
"""
|
||||
Snapshot of the tracked components at every period.
|
||||
"""
|
||||
#self.logging.info("[Controller] Tracking : %s" % ([c for c in self.tracked],))
|
||||
#log.info("Tracking component : %s" % ([c for c in self.tracked],))
|
||||
pass
|
||||
|
||||
def replay_errors(self):
|
||||
@@ -366,11 +267,11 @@ class Controller(object):
|
||||
self.handle_recv(buffer[:])
|
||||
buffer = []
|
||||
except INVALID_CONTROL_FRAME:
|
||||
self.logging.error('Invalid frame', rawmessage)
|
||||
log.error('Invalid frame', rawmessage)
|
||||
pass
|
||||
|
||||
if socks.get(self.cancel) == self.zmq.POLLIN:
|
||||
self.logging.info('[Controller] Received Cancellation')
|
||||
log.info('Received Cancellation')
|
||||
rawmessage = self.cancel.recv()
|
||||
self.cancel.send('')
|
||||
self.shutdown(soft=True)
|
||||
@@ -430,7 +331,7 @@ class Controller(object):
|
||||
if self.state is CONTROL_STATES.TERMINATE:
|
||||
return
|
||||
|
||||
self.logging.info('[Controller] Now Tracking "%s" ' % component)
|
||||
log.info(' Now Tracking "%s" ' % component)
|
||||
|
||||
universal = self.new_universal
|
||||
init_handlers = {
|
||||
@@ -452,7 +353,7 @@ class Controller(object):
|
||||
def fail_universal(self):
|
||||
pass
|
||||
# TODO: this requires higher order functionality
|
||||
#self.logging.error('[Controller] System in exception state, shutting down')
|
||||
#log.error('System in exception state, shutting down')
|
||||
#self.shutdown(soft=True)
|
||||
|
||||
def fail(self, component):
|
||||
@@ -463,7 +364,7 @@ class Controller(object):
|
||||
fail_handlers = { }
|
||||
|
||||
if component in self.topology or self.freeform:
|
||||
self.logging.info('[Controller] Component "%s" timed out' % component)
|
||||
log.warning('Component "%s" timed out' % component)
|
||||
self.tracked.remove(component)
|
||||
fail_handlers.get(component, universal)()
|
||||
|
||||
@@ -472,7 +373,7 @@ class Controller(object):
|
||||
# -------------------
|
||||
|
||||
def done(self, component):
|
||||
self.logging.info('[Controller] Component "%s" done.' % component)
|
||||
log.info('Component "%s" signaled done.' % component)
|
||||
|
||||
# --------------
|
||||
# Error Handling
|
||||
@@ -482,7 +383,7 @@ class Controller(object):
|
||||
"""
|
||||
Shutdown the system on failure.
|
||||
"""
|
||||
self.logging.error('[Controller] System in exception state, shutting down')
|
||||
log.error('System in exception state, shutting down')
|
||||
self.shutdown(soft=True)
|
||||
|
||||
def exception(self, component, failure):
|
||||
@@ -491,7 +392,7 @@ class Controller(object):
|
||||
|
||||
if component in self.topology or self.freeform:
|
||||
self.error_replay[(component, time.time())] = failure
|
||||
self.logging.error('[Controller] Component "%s" in exception state' % component)
|
||||
log.error('Component in exception state: %s' % component)
|
||||
|
||||
exception_handlers.get(component, universal)()
|
||||
else:
|
||||
@@ -517,8 +418,9 @@ class Controller(object):
|
||||
self.responses.add(identity)
|
||||
else:
|
||||
# Otherwise its something weird and we don't know
|
||||
# what to do so just say so
|
||||
self.logging.error("Weird stuff happened: %s" % msg)
|
||||
# what to do so just say so, probably line noise
|
||||
# from ZeroMQ
|
||||
log.error("Weird stuff happened: %s" % msg)
|
||||
|
||||
# A component is telling us it failed, and how
|
||||
if id is CONTROL_PROTOCOL.EXCEPTION:
|
||||
@@ -572,11 +474,14 @@ class Controller(object):
|
||||
|
||||
def do_error_replay(self):
|
||||
for (component, time), error in self.error_replay.iteritems():
|
||||
self.logging.info('[Controller] Error Log for -- %s --:\n%s' %
|
||||
log.info('Error Log for -- %s --:\n%s' %
|
||||
(component, error))
|
||||
|
||||
def shutdown(self, hard=False, soft=True, context=None):
|
||||
|
||||
if self.state is CONTROL_STATES.TERMINATE:
|
||||
return
|
||||
|
||||
if not self.polling:
|
||||
return
|
||||
|
||||
@@ -587,7 +492,7 @@ class Controller(object):
|
||||
if hard:
|
||||
self.state = CONTROL_STATES.TERMINATE
|
||||
|
||||
self.logging.info('[Controller] Hard Shutdown')
|
||||
log.info('Hard Shutdown')
|
||||
|
||||
#for asoc in self.associated:
|
||||
#asoc.close()
|
||||
@@ -595,7 +500,7 @@ class Controller(object):
|
||||
if soft:
|
||||
self.state = CONTROL_STATES.TERMINATE
|
||||
|
||||
self.logging.info('[Controller] Soft Shutdown')
|
||||
log.info('Soft Shutdown')
|
||||
self.send_softkill()
|
||||
|
||||
#for asoc in self.associated:
|
||||
|
||||
@@ -59,11 +59,20 @@ Position Tracking
|
||||
+-----------------+----------------------------------------------------+
|
||||
| last_sale_price | price at last sale of the security on the exchange |
|
||||
+-----------------+----------------------------------------------------+
|
||||
| cost_basis | the volume weighted average price paid per share |
|
||||
+-----------------+----------------------------------------------------+
|
||||
|
||||
|
||||
|
||||
Performance Period
|
||||
==================
|
||||
|
||||
Performance Periods are updated with every trade. When calling
|
||||
code needs a portfolio object that fulfills the algorithm
|
||||
protocol, use the PerformancePeriod.as_portfolio method. See that
|
||||
method for comments on the specific fields provided (and
|
||||
omitted).
|
||||
|
||||
+---------------+------------------------------------------------------+
|
||||
| key | value |
|
||||
+===============+======================================================+
|
||||
@@ -111,7 +120,7 @@ Performance Period
|
||||
|
||||
"""
|
||||
|
||||
import logging
|
||||
import logbook
|
||||
import datetime
|
||||
import pytz
|
||||
import math
|
||||
@@ -121,7 +130,7 @@ import zmq
|
||||
import zipline.protocol as zp
|
||||
import zipline.finance.risk as risk
|
||||
|
||||
LOGGER = logging.getLogger('ZiplineLogger')
|
||||
log = logbook.Logger('Performance')
|
||||
|
||||
class PerformanceTracker(object):
|
||||
"""
|
||||
@@ -190,7 +199,7 @@ class PerformanceTracker(object):
|
||||
)
|
||||
|
||||
def get_portfolio(self):
|
||||
return self.cumulative_performance.to_ndict()
|
||||
return self.cumulative_performance.as_portfolio()
|
||||
|
||||
def open(self, context):
|
||||
if self.results_addr:
|
||||
@@ -198,7 +207,7 @@ class PerformanceTracker(object):
|
||||
sock.connect(self.results_addr)
|
||||
self.results_socket = sock
|
||||
else:
|
||||
LOGGER.warn("Not streaming results because no results socket given")
|
||||
log.warn("Not streaming results because no results socket given")
|
||||
|
||||
def publish_to(self, results_addr):
|
||||
"""
|
||||
@@ -247,12 +256,12 @@ class PerformanceTracker(object):
|
||||
self.cumulative_performance.update_last_sale(event)
|
||||
self.todays_performance.update_last_sale(event)
|
||||
|
||||
|
||||
def handle_market_close(self):
|
||||
#calculate performance as of last trade
|
||||
self.cumulative_performance.calculate_performance()
|
||||
self.todays_performance.calculate_performance()
|
||||
|
||||
def handle_market_close(self):
|
||||
|
||||
# add the return results from today to the list of DailyReturn objects.
|
||||
todays_date = self.market_close.replace(hour=0, minute=0, second=0)
|
||||
todays_return_obj = risk.DailyReturn(
|
||||
@@ -278,14 +287,16 @@ class PerformanceTracker(object):
|
||||
if self.results_socket:
|
||||
msg = zp.PERF_FRAME(self.to_dict())
|
||||
self.results_socket.send(msg)
|
||||
else:
|
||||
log.debug(self.to_dict())
|
||||
|
||||
#
|
||||
if self.trading_environment.max_drawdown:
|
||||
returns = self.todays_performance.returns
|
||||
max_dd = -1 * self.trading_environment.max_drawdown
|
||||
if returns < max_dd:
|
||||
LOGGER.info(str(returns) + " broke through " + str(max_dd))
|
||||
LOGGER.info("Exceeded max drawdown.")
|
||||
log.info(str(returns) + " broke through " + str(max_dd))
|
||||
log.info("Exceeded max drawdown.")
|
||||
# mark the perf period with max loss flag,
|
||||
# so it shows up in the update, but don't end the test
|
||||
# here. Let the update go out before stopping
|
||||
@@ -320,8 +331,8 @@ class PerformanceTracker(object):
|
||||
"""
|
||||
|
||||
log_msg = "Simulated {n} trading days out of {m}."
|
||||
LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days))
|
||||
LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open))
|
||||
log.info(log_msg.format(n=self.day_count, m=self.total_days))
|
||||
log.info("first open: {d}".format(d=self.trading_environment.first_open))
|
||||
|
||||
# the stream will end on the last trading day, but will not trigger
|
||||
# an end of day, so we trigger the final market close here.
|
||||
@@ -336,7 +347,7 @@ class PerformanceTracker(object):
|
||||
)
|
||||
|
||||
if self.results_socket:
|
||||
LOGGER.info("about to stream the risk report...")
|
||||
log.info("about to stream the risk report...")
|
||||
risk_dict = self.risk_report.to_dict()
|
||||
|
||||
msg = zp.RISK_FRAME(risk_dict)
|
||||
@@ -490,14 +501,7 @@ class PerformancePeriod(object):
|
||||
self.positions[event.sid].last_sale_price = event.price
|
||||
self.positions[event.sid].last_sale_date = event.dt
|
||||
|
||||
def to_dict(self):
|
||||
"""
|
||||
Creates a dictionary representing the state of this performance
|
||||
period. See header comments for a detailed description.
|
||||
"""
|
||||
positions = self.get_positions_list()
|
||||
transactions = [x.as_dict() for x in self.processed_transactions]
|
||||
|
||||
def __core_dict(self):
|
||||
rval = {
|
||||
'ending_value' : self.ending_value,
|
||||
'capital_used' : self.period_capital_used,
|
||||
@@ -508,46 +512,69 @@ class PerformancePeriod(object):
|
||||
'cumulative_capital_used' : self.cumulative_capital_used,
|
||||
'max_capital_used' : self.max_capital_used,
|
||||
'max_leverage' : self.max_leverage,
|
||||
'positions' : positions,
|
||||
'pnl' : self.pnl,
|
||||
'returns' : self.returns,
|
||||
'transactions' : transactions,
|
||||
'period_open' : self.period_open,
|
||||
'period_close' : self.period_close
|
||||
}
|
||||
|
||||
return rval
|
||||
|
||||
|
||||
def to_dict(self):
|
||||
"""
|
||||
Creates a dictionary representing the state of this performance
|
||||
period. See header comments for a detailed description.
|
||||
"""
|
||||
rval = self.__core_dict()
|
||||
positions = self.get_positions_list()
|
||||
rval['positions'] = positions
|
||||
|
||||
# we want the key to be absent, not just empty
|
||||
if not self.keep_transactions:
|
||||
del rval['transactions']
|
||||
if self.keep_transactions:
|
||||
transactions = [x.as_dict() for x in self.processed_transactions]
|
||||
rval['transactions'] = transactions
|
||||
|
||||
return rval
|
||||
|
||||
def to_ndict(self):
|
||||
def as_portfolio(self):
|
||||
"""
|
||||
Creates a ndict representing the state of this perfomance period.
|
||||
Properties are the same as the results of to_dict. See header comments
|
||||
for a detailed description.
|
||||
|
||||
The purpose of this method is to provide a portfolio
|
||||
object to algorithms running inside the same trading
|
||||
client. The data needed is captured raw in a
|
||||
PerformancePeriod, and in this method we rename some
|
||||
fields for usability and remove extraneous fields.
|
||||
"""
|
||||
positions = self.get_positions(ndicted=True)
|
||||
portfolio = self.__core_dict()
|
||||
# rename:
|
||||
# ending_cash -> cash
|
||||
# period_open -> backtest_start
|
||||
#
|
||||
# remove:
|
||||
# period_close, starting_value,
|
||||
# cumulative_capital_used, max_leverage, max_capital_used
|
||||
portfolio['cash'] = portfolio['ending_cash']
|
||||
portfolio['start_date'] = portfolio['period_open']
|
||||
portfolio['position_value'] = portfolio['ending_value']
|
||||
|
||||
positions = zp.ndict(positions)
|
||||
del(portfolio['ending_cash'])
|
||||
del(portfolio['period_open'])
|
||||
del(portfolio['period_close'])
|
||||
del(portfolio['starting_value'])
|
||||
del(portfolio['ending_value'])
|
||||
del(portfolio['cumulative_capital_used'])
|
||||
del(portfolio['max_leverage'])
|
||||
del(portfolio['max_capital_used'])
|
||||
|
||||
return zp.ndict({
|
||||
'ending_value' : self.ending_value,
|
||||
'capital_used' : self.period_capital_used,
|
||||
'starting_value' : self.starting_value,
|
||||
'starting_cash' : self.starting_cash,
|
||||
'ending_cash' : self.ending_cash,
|
||||
'cumulative_capital_used' : self.cumulative_capital_used,
|
||||
'max_capital_used' : self.max_capital_used,
|
||||
'max_leverage' : self.max_leverage,
|
||||
'positions' : positions,
|
||||
'transactions' : self.processed_transactions
|
||||
})
|
||||
portfolio['positions'] = self.get_positions(ndicted=True)
|
||||
return zp.ndict(portfolio)
|
||||
|
||||
def get_positions(self, ndicted=False):
|
||||
positions = {}
|
||||
if ndicted:
|
||||
positions = zp.ndict({})
|
||||
else:
|
||||
positions = {}
|
||||
|
||||
for sid, pos in self.positions.iteritems():
|
||||
cur = pos.to_dict()
|
||||
if ndicted:
|
||||
|
||||
@@ -27,7 +27,7 @@ Risk Report
|
||||
| alpha | The _algorithm_ alpha to the benchmark. |
|
||||
+-----------------+----------------------------------------------------+
|
||||
| excess_return | The excess return of the algorithm over the |
|
||||
| | benchmark. |
|
||||
| | treasuries. |
|
||||
+-----------------+----------------------------------------------------+
|
||||
| max_drawdown | The largest relative peak to relative trough move |
|
||||
| | for the portfolio returns between self.start_date |
|
||||
@@ -187,6 +187,8 @@ class RiskMetrics():
|
||||
return period_returns, returns
|
||||
|
||||
def calculate_volatility(self, daily_returns):
|
||||
# TODO: we should be using an annualized number for the
|
||||
# square root, not the days in the period.
|
||||
return np.std(daily_returns, ddof=1) * math.sqrt(self.trading_days)
|
||||
|
||||
def calculate_sharpe(self):
|
||||
|
||||
@@ -7,20 +7,20 @@ from zipline.finance.movingaverage import EventWindow
|
||||
class VWAPTransform(BaseTransform):
|
||||
|
||||
def init(self, name, daycount=3):
|
||||
self.state = {}
|
||||
self.state['name'] = name
|
||||
self.props = {}
|
||||
self.props['name'] = name
|
||||
self.daycount = daycount
|
||||
self.by_sid = defaultdict(self.create_vwap)
|
||||
|
||||
@property
|
||||
def get_id(self):
|
||||
return self.state['name']
|
||||
return self.props['name']
|
||||
|
||||
def transform(self, event):
|
||||
cur = self.by_sid[event.sid]
|
||||
cur.update(event)
|
||||
self.state['value'] = cur.vwap
|
||||
return self.state
|
||||
self.props['value'] = cur.vwap
|
||||
return self.props
|
||||
|
||||
def create_vwap(self):
|
||||
return DailyVWAP(self.daycount)
|
||||
|
||||
@@ -60,8 +60,6 @@ before invoking simulate.
|
||||
+---------------------------------+
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
import zipline.utils.factory as factory
|
||||
|
||||
from zipline.components import DataSource
|
||||
@@ -73,8 +71,6 @@ from zipline.core.devsimulator import Simulator
|
||||
from zipline.core.monitor import Controller
|
||||
from zipline.finance.trading import SIMULATION_STYLE
|
||||
|
||||
LOGGER = logging.getLogger('ZiplineLogger')
|
||||
|
||||
class SimulatedTrading(object):
|
||||
"""
|
||||
Zipline with::
|
||||
@@ -133,7 +129,6 @@ class SimulatedTrading(object):
|
||||
self.con = Controller(
|
||||
sockets[6],
|
||||
sockets[7],
|
||||
logger = LOGGER
|
||||
)
|
||||
|
||||
self.con.cancel_socket = self.allocator.lease(1)[0]
|
||||
|
||||
@@ -25,7 +25,7 @@ class BaseTransform(Component):
|
||||
|
||||
@property
|
||||
def get_id(self):
|
||||
return self.state['name']
|
||||
return self.props['name']
|
||||
|
||||
@property
|
||||
def get_type(self):
|
||||
@@ -116,9 +116,9 @@ class BaseTransform(Component):
|
||||
|
||||
Transforms run in parallel and results are merged into a
|
||||
single map, so transform names must be unique. Best practice
|
||||
is to use the self.state object initialized from the transform
|
||||
is to use the self.props object initialized from the transform
|
||||
configuration, and only set the transformed value::
|
||||
|
||||
self.state['value'] = transformed_value
|
||||
self.props['value'] = transformed_value
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -39,24 +39,6 @@ class WorkflowMeta(type):
|
||||
"""
|
||||
Base metaclass component workflows.
|
||||
"""
|
||||
@property
|
||||
def state(self):
|
||||
if not hasattr(self, '_state'):
|
||||
self._state = self.initial_state
|
||||
else:
|
||||
return self._state
|
||||
|
||||
@state.setter
|
||||
def state(self, new):
|
||||
if not hasattr(self, '_state'):
|
||||
self._state = self.initial_state
|
||||
|
||||
old = self._state
|
||||
|
||||
if (old, new) in self.workflow:
|
||||
self._state = new
|
||||
else:
|
||||
raise RuntimeError("Invalid State Transition : %s -> %s" %(old, new))
|
||||
|
||||
def __new__(cls, name, mro, attrs):
|
||||
base = 'Component'
|
||||
@@ -71,7 +53,6 @@ class WorkflowMeta(type):
|
||||
raise RuntimeError('`workflow` is a reserved attribute.')
|
||||
|
||||
if not state:
|
||||
import pdb; pdb.set_trace()
|
||||
raise RuntimeError('Must specify states')
|
||||
|
||||
if not transitions:
|
||||
|
||||
@@ -3,6 +3,7 @@ Small classes to assist with timezone calculations, LOGGER configuration,
|
||||
and other common operations.
|
||||
"""
|
||||
|
||||
# DEPRECATED DO NOT USE
|
||||
|
||||
import logging
|
||||
import logging.config
|
||||
|
||||
Reference in New Issue
Block a user