From 8b95aebcf2c7701d3ab1a413a00c8b1f907b4ac6 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Mon, 14 May 2012 10:57:40 -0400 Subject: [PATCH] Refactor lots of things. --- tests/test_finance.py | 2 +- zipline/__init__.py | 3 ++- zipline/components/__init__.py | 4 +--- zipline/components/datasource.py | 2 +- zipline/components/feed.py | 2 +- zipline/core/__init__.py | 4 ++++ zipline/core/component.py | 11 ++++++----- zipline/core/host.py | 7 ++++--- zipline/finance/sources.py | 27 ++++++++++++--------------- zipline/finance/trading.py | 6 +++--- zipline/lines.py | 10 ++++++---- zipline/simulator.py | 7 ++----- zipline/transforms/__init__.py | 12 +++++++++--- zipline/transforms/base.py | 2 +- zipline/utils/__init__.py | 6 ++++++ zipline/utils/factory.py | 3 ++- 16 files changed, 61 insertions(+), 47 deletions(-) diff --git a/tests/test_finance.py b/tests/test_finance.py index ec9e75af..4d5d88fa 100644 --- a/tests/test_finance.py +++ b/tests/test_finance.py @@ -15,7 +15,7 @@ import zipline.protocol as zp import zipline.finance.performance as perf from zipline.test_algorithms import TestAlgorithm -from zipline.sources import SpecificEquityTrades +from zipline.finance.sources import SpecificEquityTrades from zipline.finance.trading import TransactionSimulator, \ TradeSimulationClient, TradingEnvironment from zipline.simulator import AddressAllocator, Simulator diff --git a/zipline/__init__.py b/zipline/__init__.py index afdd901e..4a30d7cc 100644 --- a/zipline/__init__.py +++ b/zipline/__init__.py @@ -5,7 +5,7 @@ Zipline # This is *not* a place to dump arbitrary classes/modules for convenience, # it is a place to expose the public interfaces. -import protocol +import protocol # namespace from core.monitor import Controller from lines import SimulatedTrading from core.host import ComponentHost @@ -16,5 +16,6 @@ __all__ = [ Controller, ComponentHost, protocol, + namedict, ndict ] diff --git a/zipline/components/__init__.py b/zipline/components/__init__.py index 57823efa..b845f2db 100644 --- a/zipline/components/__init__.py +++ b/zipline/components/__init__.py @@ -1,13 +1,11 @@ from feed import Feed from merge import Merge from passthrough import PassthroughTransform -from source import DataSource -from transform import BaseTransform +from datasource import DataSource __all__ = [ Feed, Merge, PassthroughTransform, DataSource, - BaseTransform, ] diff --git a/zipline/components/datasource.py b/zipline/components/datasource.py index d2452dad..27539b3e 100644 --- a/zipline/components/datasource.py +++ b/zipline/components/datasource.py @@ -5,7 +5,7 @@ Commonly used messaging components. import logging import zipline.protocol as zp -from zipline.component import Component +from zipline.core.component import Component from zipline.protocol import COMPONENT_TYPE LOGGER = logging.getLogger('ZiplineLogger') diff --git a/zipline/components/feed.py b/zipline/components/feed.py index c7353c86..bff79e79 100644 --- a/zipline/components/feed.py +++ b/zipline/components/feed.py @@ -1,7 +1,7 @@ import logging from collections import Counter -from zipline.core import Component +from zipline.core.component import Component import zipline.protocol as zp from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \ diff --git a/zipline/core/__init__.py b/zipline/core/__init__.py index 1347bd67..d487dd05 100644 --- a/zipline/core/__init__.py +++ b/zipline/core/__init__.py @@ -1,5 +1,9 @@ +from host import ComponentHost from component import Component +from monitor import Controller __all__ = [ Component, + Controller, + ComponentHost ] diff --git a/zipline/core/component.py b/zipline/core/component.py index 4cfb8863..dddd6f4f 100644 --- a/zipline/core/component.py +++ b/zipline/core/component.py @@ -7,6 +7,7 @@ import sys import uuid import time import socket +import logging import traceback import humanhash @@ -17,11 +18,11 @@ import gevent_zeromq # zmq_ctypes #import zmq_ctypes -import zipline.util as qutil -from zipline.gpoll import _Poller as GeventPoller +from zipline.utils.gpoll import _Poller as GeventPoller from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \ COMPONENT_FAILURE, CONTROL_FRAME +LOGGER = logging.getLogger('ZiplineLogger') class Component(object): """ @@ -309,7 +310,7 @@ class Component(object): ) self.control_out.send(exception_frame) - qutil.LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id)) + LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id)) def signal_done(self): """ @@ -336,7 +337,7 @@ class Component(object): #notify internal work look that we're done self.done = True # TODO: use state flag - qutil.LOGGER.info("[%s] DONE" % self.get_id) + LOGGER.info("[%s] DONE" % self.get_id) # ----------- # Messaging @@ -456,7 +457,7 @@ class Component(object): DEPRECATED, left in for compatability for now. """ - qutil.LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id)) + LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id)) self.sync_socket = self.context.socket(self.zmq.REQ) self.sync_socket.connect(self.addresses['sync_address']) diff --git a/zipline/core/host.py b/zipline/core/host.py index c650fed3..250daf96 100644 --- a/zipline/core/host.py +++ b/zipline/core/host.py @@ -1,10 +1,11 @@ import logging import datetime -from core.component import Component -from components import Feed, Merge, PassthroughTransform, \ - DataSource, BaseTransform +from component import Component +from zipline.transforms import BaseTransform +from zipline.components import Feed, Merge, PassthroughTransform, \ + DataSource from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE LOGGER = logging.getLogger('ZiplineLogger') diff --git a/zipline/finance/sources.py b/zipline/finance/sources.py index bf08644c..bd6ea035 100644 --- a/zipline/finance/sources.py +++ b/zipline/finance/sources.py @@ -5,11 +5,12 @@ import datetime import random import pytz -import zipline.messaging as zm +from zipline.components import DataSource +from zipline.utils import ndict, namedict + import zipline.protocol as zp - -class TradeDataSource(zm.DataSource): +class TradeDataSource(DataSource): def send(self, event): """ @@ -18,19 +19,19 @@ class TradeDataSource(zm.DataSource): :py:func: `zipline.protocol.TRADE_FRAME` :rtype: None """ - + event.source_id = self.get_id - if event.sid in self.filter['SID']: + if event.sid in self.filter['SID']: message = zp.DATASOURCE_FRAME(event) else: - blank = zp.namedict({ + blank = namedict({ "type" : zp.DATASOURCE_TYPE.TRADE, "source_id" : self.get_id }) message = zp.DATASOURCE_FRAME(blank) - + self.data_socket.send(message) - + class RandomEquityTrades(TradeDataSource): """ @@ -38,7 +39,7 @@ class RandomEquityTrades(TradeDataSource): """ def __init__(self, sid, source_id, count): - zm.DataSource.__init__(self, source_id) + DataSource.__init__(self, source_id) self.count = count self.incr = 0 self.sid = sid @@ -67,7 +68,6 @@ class RandomEquityTrades(TradeDataSource): }) self.send(event) self.incr += 1 - class SpecificEquityTrades(TradeDataSource): @@ -77,7 +77,7 @@ class SpecificEquityTrades(TradeDataSource): def __init__(self, source_id, event_list): """ - :param event_list: should be a chronologically ordered list of + :param event_list: should be a chronologically ordered list of dictionaries in the following form: event = { @@ -87,14 +87,13 @@ class SpecificEquityTrades(TradeDataSource): 'volume' : integer for volume } """ - zm.DataSource.__init__(self, source_id) + DataSource.__init__(self, source_id) self.event_list = event_list self.count = 0 def get_type(self): zp.COMPONENT_TYPE.SOURCE - def do_work(self): if(len(self.event_list) == 0): self.signal_done() @@ -103,5 +102,3 @@ class SpecificEquityTrades(TradeDataSource): event = self.event_list.pop(0) self.send(zp.namedict(event)) self.count +=1 - - diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 8ca7712a..8f8f7380 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -9,7 +9,7 @@ from collections import Counter # from gevent.select import select -import zipline.messaging as qmsg +from zipline.core import Component import zipline.protocol as zp import zipline.finance.performance as perf @@ -26,10 +26,10 @@ SIMULATION_STYLE = Enum( LOGGER = logging.getLogger('ZiplineLogger') -class TradeSimulationClient(qmsg.Component): +class TradeSimulationClient(Component): def __init__(self, trading_environment, sim_style): - qmsg.Component.__init__(self) + Component.__init__(self) self.received_count = 0 self.prev_dt = None self.event_queue = None diff --git a/zipline/lines.py b/zipline/lines.py index ced4824c..b0b51eb7 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -73,10 +73,12 @@ import zipline.utils.factory as factory import zipline.finance.risk as risk import zipline.protocol as zp import zipline.finance.performance as perf -import zipline.messaging as zmsg + +from zipline.components import DataSource +from zipline.transforms import BaseTransform from zipline.test_algorithms import TestAlgorithm -from zipline.sources import SpecificEquityTrades +from zipline.finance.sources import SpecificEquityTrades from zipline.finance.trading import TradeSimulationClient from zipline.simulator import AddressAllocator, Simulator from zipline.core.monitor import Controller @@ -289,14 +291,14 @@ class SimulatedTrading(object): Adds the source to the zipline, sets the sid filter of the source to the algorithm's sid filter. """ - assert isinstance(source, zmsg.DataSource) + assert isinstance(source, DataSource) self.check_started() source.set_filter('SID', self.algorithm.get_sid_filter()) self.sim.register_components([source]) self.sources[source.get_id] = source def add_transform(self, transform): - assert isinstance(transform, zmsg.BaseTransform) + assert isinstance(transform, BaseTransform) self.check_started() self.sim.register_components([transform]) self.transforms[transform.get_id] = transform diff --git a/zipline/simulator.py b/zipline/simulator.py index e601a8b7..37418d06 100644 --- a/zipline/simulator.py +++ b/zipline/simulator.py @@ -3,10 +3,7 @@ Simulator hosts all the components necessary to execute a simluation. See :py:me """ import threading -import mock -from collections import defaultdict -from zipline.core.monitor import Controller -from zipline.messaging import ComponentHost +from zipline.core import ComponentHost class AddressAllocator(object): @@ -34,7 +31,7 @@ class Simulator(ComponentHost): ComponentHost.__init__(self, addresses) self.subthreads = [] self.running = False - + @property def get_id(self): return 'Simple Simulator' diff --git a/zipline/transforms/__init__.py b/zipline/transforms/__init__.py index ea33ca7c..fb244e2e 100644 --- a/zipline/transforms/__init__.py +++ b/zipline/transforms/__init__.py @@ -6,14 +6,20 @@ Transforms provide re-useable components for stream processing. All Transforms expect to receive data events from zipline.core.DataFeed asynchronously via zeromq. Each transform is designed to run in independent process space, independently of all other transforms, to allow for parallel -computation. +computation. Each transform must maintain the state necessary to calculate the transform of -each new feed events. +each new feed events. To simplify the consumption of feed and transform data events, this module also provides the TransformsMerge class. TransformsMerge initializes as set of transforms and subscribes to their output. Each feed event is then combined with all the transforms of that event into a single new message. -""" \ No newline at end of file +""" + +from base import BaseTransform + +__all__ = [ + BaseTransform, +] diff --git a/zipline/transforms/base.py b/zipline/transforms/base.py index d763113e..90162437 100644 --- a/zipline/transforms/base.py +++ b/zipline/transforms/base.py @@ -1,5 +1,5 @@ import logging -from zipline.core import Component +from zipline.core.component import Component import zipline.protocol as zp from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \ diff --git a/zipline/utils/__init__.py b/zipline/utils/__init__.py index e69de29b..b8670bc9 100644 --- a/zipline/utils/__init__.py +++ b/zipline/utils/__init__.py @@ -0,0 +1,6 @@ +from protocol_utils import namedict, ndict + +__all__ = [ + namedict, + ndict, +] diff --git a/zipline/utils/factory.py b/zipline/utils/factory.py index 937e92e3..a19b9a48 100644 --- a/zipline/utils/factory.py +++ b/zipline/utils/factory.py @@ -1,6 +1,7 @@ """ Factory functions to prepare useful data for tests. """ + import pytz import msgpack import random @@ -8,7 +9,7 @@ import random from datetime import datetime, timedelta import zipline.finance.risk as risk import zipline.protocol as zp -from zipline.sources import SpecificEquityTrades, RandomEquityTrades +from zipline.finance.sources import SpecificEquityTrades, RandomEquityTrades from zipline.finance.trading import TradingEnvironment def load_market_data():