Refactor lots of things.

This commit is contained in:
Stephen Diehl
2012-05-14 10:57:40 -04:00
parent 6df73110ff
commit 8b95aebcf2
16 changed files with 61 additions and 47 deletions
+1 -1
View File
@@ -15,7 +15,7 @@ import zipline.protocol as zp
import zipline.finance.performance as perf
from zipline.test_algorithms import TestAlgorithm
from zipline.sources import SpecificEquityTrades
from zipline.finance.sources import SpecificEquityTrades
from zipline.finance.trading import TransactionSimulator, \
TradeSimulationClient, TradingEnvironment
from zipline.simulator import AddressAllocator, Simulator
+2 -1
View File
@@ -5,7 +5,7 @@ Zipline
# This is *not* a place to dump arbitrary classes/modules for convenience,
# it is a place to expose the public interfaces.
import protocol
import protocol # namespace
from core.monitor import Controller
from lines import SimulatedTrading
from core.host import ComponentHost
@@ -16,5 +16,6 @@ __all__ = [
Controller,
ComponentHost,
protocol,
namedict,
ndict
]
+1 -3
View File
@@ -1,13 +1,11 @@
from feed import Feed
from merge import Merge
from passthrough import PassthroughTransform
from source import DataSource
from transform import BaseTransform
from datasource import DataSource
__all__ = [
Feed,
Merge,
PassthroughTransform,
DataSource,
BaseTransform,
]
+1 -1
View File
@@ -5,7 +5,7 @@ Commonly used messaging components.
import logging
import zipline.protocol as zp
from zipline.component import Component
from zipline.core.component import Component
from zipline.protocol import COMPONENT_TYPE
LOGGER = logging.getLogger('ZiplineLogger')
+1 -1
View File
@@ -1,7 +1,7 @@
import logging
from collections import Counter
from zipline.core import Component
from zipline.core.component import Component
import zipline.protocol as zp
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \
+4
View File
@@ -1,5 +1,9 @@
from host import ComponentHost
from component import Component
from monitor import Controller
__all__ = [
Component,
Controller,
ComponentHost
]
+6 -5
View File
@@ -7,6 +7,7 @@ import sys
import uuid
import time
import socket
import logging
import traceback
import humanhash
@@ -17,11 +18,11 @@ import gevent_zeromq
# zmq_ctypes
#import zmq_ctypes
import zipline.util as qutil
from zipline.gpoll import _Poller as GeventPoller
from zipline.utils.gpoll import _Poller as GeventPoller
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \
COMPONENT_FAILURE, CONTROL_FRAME
LOGGER = logging.getLogger('ZiplineLogger')
class Component(object):
"""
@@ -309,7 +310,7 @@ class Component(object):
)
self.control_out.send(exception_frame)
qutil.LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id))
def signal_done(self):
"""
@@ -336,7 +337,7 @@ class Component(object):
#notify internal work look that we're done
self.done = True # TODO: use state flag
qutil.LOGGER.info("[%s] DONE" % self.get_id)
LOGGER.info("[%s] DONE" % self.get_id)
# -----------
# Messaging
@@ -456,7 +457,7 @@ class Component(object):
DEPRECATED, left in for compatability for now.
"""
qutil.LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id))
LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id))
self.sync_socket = self.context.socket(self.zmq.REQ)
self.sync_socket.connect(self.addresses['sync_address'])
+4 -3
View File
@@ -1,10 +1,11 @@
import logging
import datetime
from core.component import Component
from components import Feed, Merge, PassthroughTransform, \
DataSource, BaseTransform
from component import Component
from zipline.transforms import BaseTransform
from zipline.components import Feed, Merge, PassthroughTransform, \
DataSource
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE
LOGGER = logging.getLogger('ZiplineLogger')
+12 -15
View File
@@ -5,11 +5,12 @@ import datetime
import random
import pytz
import zipline.messaging as zm
from zipline.components import DataSource
from zipline.utils import ndict, namedict
import zipline.protocol as zp
class TradeDataSource(zm.DataSource):
class TradeDataSource(DataSource):
def send(self, event):
"""
@@ -18,19 +19,19 @@ class TradeDataSource(zm.DataSource):
:py:func: `zipline.protocol.TRADE_FRAME`
:rtype: None
"""
event.source_id = self.get_id
if event.sid in self.filter['SID']:
if event.sid in self.filter['SID']:
message = zp.DATASOURCE_FRAME(event)
else:
blank = zp.namedict({
blank = namedict({
"type" : zp.DATASOURCE_TYPE.TRADE,
"source_id" : self.get_id
})
message = zp.DATASOURCE_FRAME(blank)
self.data_socket.send(message)
class RandomEquityTrades(TradeDataSource):
"""
@@ -38,7 +39,7 @@ class RandomEquityTrades(TradeDataSource):
"""
def __init__(self, sid, source_id, count):
zm.DataSource.__init__(self, source_id)
DataSource.__init__(self, source_id)
self.count = count
self.incr = 0
self.sid = sid
@@ -67,7 +68,6 @@ class RandomEquityTrades(TradeDataSource):
})
self.send(event)
self.incr += 1
class SpecificEquityTrades(TradeDataSource):
@@ -77,7 +77,7 @@ class SpecificEquityTrades(TradeDataSource):
def __init__(self, source_id, event_list):
"""
:param event_list: should be a chronologically ordered list of
:param event_list: should be a chronologically ordered list of
dictionaries in the following form:
event = {
@@ -87,14 +87,13 @@ class SpecificEquityTrades(TradeDataSource):
'volume' : integer for volume
}
"""
zm.DataSource.__init__(self, source_id)
DataSource.__init__(self, source_id)
self.event_list = event_list
self.count = 0
def get_type(self):
zp.COMPONENT_TYPE.SOURCE
def do_work(self):
if(len(self.event_list) == 0):
self.signal_done()
@@ -103,5 +102,3 @@ class SpecificEquityTrades(TradeDataSource):
event = self.event_list.pop(0)
self.send(zp.namedict(event))
self.count +=1
+3 -3
View File
@@ -9,7 +9,7 @@ from collections import Counter
# from gevent.select import select
import zipline.messaging as qmsg
from zipline.core import Component
import zipline.protocol as zp
import zipline.finance.performance as perf
@@ -26,10 +26,10 @@ SIMULATION_STYLE = Enum(
LOGGER = logging.getLogger('ZiplineLogger')
class TradeSimulationClient(qmsg.Component):
class TradeSimulationClient(Component):
def __init__(self, trading_environment, sim_style):
qmsg.Component.__init__(self)
Component.__init__(self)
self.received_count = 0
self.prev_dt = None
self.event_queue = None
+6 -4
View File
@@ -73,10 +73,12 @@ import zipline.utils.factory as factory
import zipline.finance.risk as risk
import zipline.protocol as zp
import zipline.finance.performance as perf
import zipline.messaging as zmsg
from zipline.components import DataSource
from zipline.transforms import BaseTransform
from zipline.test_algorithms import TestAlgorithm
from zipline.sources import SpecificEquityTrades
from zipline.finance.sources import SpecificEquityTrades
from zipline.finance.trading import TradeSimulationClient
from zipline.simulator import AddressAllocator, Simulator
from zipline.core.monitor import Controller
@@ -289,14 +291,14 @@ class SimulatedTrading(object):
Adds the source to the zipline, sets the sid filter of the
source to the algorithm's sid filter.
"""
assert isinstance(source, zmsg.DataSource)
assert isinstance(source, DataSource)
self.check_started()
source.set_filter('SID', self.algorithm.get_sid_filter())
self.sim.register_components([source])
self.sources[source.get_id] = source
def add_transform(self, transform):
assert isinstance(transform, zmsg.BaseTransform)
assert isinstance(transform, BaseTransform)
self.check_started()
self.sim.register_components([transform])
self.transforms[transform.get_id] = transform
+2 -5
View File
@@ -3,10 +3,7 @@ Simulator hosts all the components necessary to execute a simluation. See :py:me
"""
import threading
import mock
from collections import defaultdict
from zipline.core.monitor import Controller
from zipline.messaging import ComponentHost
from zipline.core import ComponentHost
class AddressAllocator(object):
@@ -34,7 +31,7 @@ class Simulator(ComponentHost):
ComponentHost.__init__(self, addresses)
self.subthreads = []
self.running = False
@property
def get_id(self):
return 'Simple Simulator'
+9 -3
View File
@@ -6,14 +6,20 @@ Transforms provide re-useable components for stream processing. All
Transforms expect to receive data events from zipline.core.DataFeed
asynchronously via zeromq. Each transform is designed to run in independent
process space, independently of all other transforms, to allow for parallel
computation.
computation.
Each transform must maintain the state necessary to calculate the transform of
each new feed events.
each new feed events.
To simplify the consumption of feed and transform data events, this module
also provides the TransformsMerge class. TransformsMerge initializes as set of
transforms and subscribes to their output. Each feed event is then combined with
all the transforms of that event into a single new message.
"""
"""
from base import BaseTransform
__all__ = [
BaseTransform,
]
+1 -1
View File
@@ -1,5 +1,5 @@
import logging
from zipline.core import Component
from zipline.core.component import Component
import zipline.protocol as zp
from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \
+6
View File
@@ -0,0 +1,6 @@
from protocol_utils import namedict, ndict
__all__ = [
namedict,
ndict,
]
+2 -1
View File
@@ -1,6 +1,7 @@
"""
Factory functions to prepare useful data for tests.
"""
import pytz
import msgpack
import random
@@ -8,7 +9,7 @@ import random
from datetime import datetime, timedelta
import zipline.finance.risk as risk
import zipline.protocol as zp
from zipline.sources import SpecificEquityTrades, RandomEquityTrades
from zipline.finance.sources import SpecificEquityTrades, RandomEquityTrades
from zipline.finance.trading import TradingEnvironment
def load_market_data():