beginning refactor to use single threaded simulator.

This commit is contained in:
fawce
2012-08-06 13:11:20 -04:00
parent b67cbb2aab
commit 06dc6f7acb
6 changed files with 76 additions and 282 deletions
-6
View File
@@ -6,15 +6,9 @@ Zipline
# it is a place to expose the public interfaces.
import protocol # namespace
from core.monitor import Monitor
from lines import SimulatedTrading
from core.host import ComponentHost
from utils.protocol_utils import ndict
__all__ = [
SimulatedTrading,
Monitor,
ComponentHost,
protocol,
ndict
]
+7 -9
View File
@@ -1,6 +1,5 @@
import pytz
from time import sleep
from pprint import pprint as pp
from datetime import datetime, timedelta
@@ -16,7 +15,7 @@ from zipline.gens.tradesimulation import TradeSimulationClient as tsc
import zipline.protocol as zp
if __name__ == "__main__":
filter = [2,3]
#Set up source a. One minute between events.
args_a = tuple()
@@ -42,19 +41,18 @@ if __name__ == "__main__":
#Set up source c. Three minutes between events.
sorted = date_sorted_sources(source_a, source_b)
sorted = date_sorted_sources(source_a, source_b)
passthrough = StatefulTransform(Passthrough)
mavg_price = StatefulTransform(MovingAverage, timedelta(minutes = 20), ['price'])
merged = merged_transforms(sorted, passthrough, mavg_price)
algo = TestAlgorithm(2, 10, 100, sid_filter = [2,3])
environment = create_trading_environment(year = 2012)
style = zp.SIMULATION_STYLE.FIXED_SLIPPAGE
trading_client = tsc(algo, environment, style)
for message in trading_client.simulate(merged):
pp(message)
+1 -2
View File
@@ -6,8 +6,7 @@ import random
from itertools import chain, cycle, ifilter, izip
from datetime import datetime, timedelta
from zipline.utils.factory import create_trade
from zipline.gens.utils import hash_args
from zipline.gens.utils import hash_args, create_trade
def date_gen(start = datetime(2006, 6, 6, 12),
delta = timedelta(minutes = 1),
+12
View File
@@ -66,6 +66,18 @@ def hash_args(*args, **kwargs):
hasher.update(combined)
return hasher.hexdigest()
def create_trade(sid, price, amount, datetime, source_id = "test_factory"):
row = ndict({
'source_id' : source_id,
'type' : DATASOURCE_TYPE.TRADE,
'sid' : sid,
'dt' : datetime,
'price' : price,
'volume' : amount
})
return row
def assert_datasource_protocol(event):
"""Assert that an event meets the protocol for datasource outputs."""
+40 -232
View File
@@ -60,110 +60,41 @@ before invoking simulate.
+---------------------------------+
"""
import inspect
import logbook
#import zipline.utils.factory as factory
from zipline.components import DataSource
from zipline.transforms import BaseTransform
from zipline.test_algorithms import TestAlgorithm
from zipline.components import TradeSimulationClient
from zipline.core.process import ProcessSimulator
from zipline.core.monitor import Monitor
from zipline.finance.trading import SIMULATION_STYLE
from zipline.utils import factory
import pytz
from pprint import pprint as pp
from datetime import datetime, timedelta
from zipline.utils.factory import create_trading_environment
from zipline.test_algorithms import TestAlgorithm
from zipline.gens.composites import SourceBundle, TransformBundle, \
date_sorted_sources, merged_transforms
from zipline.gens.tradegens import SpecificEquityTrades
from zipline.gens.transform import MovingAverage, Passthrough, StatefulTransform
from zipline.gens.tradesimulation import TradeSimulationClient as tsc
import zipline.protocol as zp
log = logbook.Logger('Lines')
class SimulatedTrading(object):
"""
Zipline with::
- _no_ data sources.
- Trade simulation client, which is available to send callbacks on
events and also accept orders to be simulated.
- An order data source, which will receive orders from the trade
simulation client, and feed them into the event stream to be
serialized and order alongside all other data source events.
- transaction simulation transformation, which receives the order
events and estimates a theoretical execution price and volume.
@staticmethod
def create_simulation(sources, transforms, algorithm, environment, style):
All components in this zipline are subject to heartbeat checks and
a control monitor, which can kill the entire zipline in the event of
exceptions in one of the components or an external request to end the
simulation.
"""
sorted = date_sorted_sources(*sources)
passthrough = StatefulTransform(Passthrough)
def __init__(self, **config):
"""
:param config: a dict with the following required properties::
- algorithm: a class that follows the algorithm protocol. See
:py:meth:`zipline.finance.trading.TradeSimulationClient.add_algorithm
for details.
- trading_environment: an instance of
:py:class:`zipline.trading.TradingEnvironment`
- allocator: an instance of
:py:class:`zipline.simulator.AddressAllocator`
- simulation_style: optional parameter that configures the
:py:class:`zipline.finance.trading.TransactionSimulator`. Expects
a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading`
"""
assert isinstance(config, dict)
self.algorithm = config['algorithm']
self.allocator = config['allocator']
self.trading_environment = config['trading_environment']
self.sim_style = config.get('simulation_style')
self.send_sighup = config.get('send_sighup', False)
self.leased_sockets = []
self.sim_context = None
sockets = self.allocate_sockets(7)
addresses = {
'sync_address' : sockets[0],
'data_address' : sockets[1],
'feed_address' : sockets[2],
'merge_address' : sockets[3],
# TODO: this refers to the results of the merge, a
# horribly confusing name for the socket.
'results_address' : sockets[4],
}
self.monitor = Monitor(
# pub socket
sockets[5],
# route socket
sockets[6],
# exception socket to match tradesimclient's result
# socket, because we want to relay exceptions to the
# same listener
config['results_socket'],
send_sighup=self.send_sighup
)
self.started = False
self.sim = ProcessSimulator(addresses)
self.clients = {}
self.trading_client = TradeSimulationClient(
self.trading_environment,
self.sim_style,
config['results_socket'],
self.algorithm
)
self.add_client(self.trading_client)
# setup all sources
self.sources = {}
#setup transforms
self.transforms = {}
self.sim.register_monitor( self.monitor )
merged = merged_transforms(sorted, passthrough, *transforms)
trading_client = tsc(algorithm, environment, style)
return trading_client.simluate(merged)
@staticmethod
@@ -173,7 +104,6 @@ class SimulatedTrading(object):
- environment - a \
:py:class:`zipline.finance.trading.TradingEnvironment`
- allocator - a :py:class:`zipline.simulator.AddressAllocator`
- sid - an integer, which will be used as the security ID.
- order_count - the number of orders the test algo will place,
defaults to 100
@@ -188,10 +118,11 @@ class SimulatedTrading(object):
- simulation_style: optional parameter that configures the
:py:class:`zipline.finance.trading.TransactionSimulator`. Expects
a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading`
- transforms: optional parameter that provides a list
of StatefulTransform objects.
"""
assert isinstance(config, dict)
allocator = config['allocator']
sid = config['sid']
#--------------------
@@ -236,6 +167,12 @@ class SimulatedTrading(object):
trade_count,
trading_environment
)
#-------------------
# Transforms
#-------------------
transforms = config.get('transforms', [])
#-------------------
# Create the Algo
#-------------------
@@ -248,149 +185,20 @@ class SimulatedTrading(object):
order_count
)
if config.has_key('results_socket'):
results_socket = config['results_socket']
else:
results_socket = None
#-------------------
# Simulation
#-------------------
zipline = SimulatedTrading(**{
'algorithm' : test_algo,
'trading_environment' : trading_environment,
'allocator' : allocator,
'simulation_style' : simulation_style,
'results_socket' : results_socket,
})
sim = SimulatedTrading.create_simulation(
[trade_source],
transforms,
test_algo,
trading_environment,
simulation_style)
#-------------------
zipline.add_source(trade_source)
return sim
return zipline
def add_source(self, source):
"""
Adds the source to the zipline, sets the sid filter of the
source to the algorithm's sid filter.
"""
assert isinstance(source, DataSource)
self.check_started()
source.set_filter('sid', self.algorithm.get_sid_filter())
self.sim.register_components([source])
# ``id`` is name of source_id, ``get_id`` is the class name
self.sources[source.get_id] = source
def add_transform(self, transform):
assert isinstance(transform, BaseTransform)
self.check_started()
self.sim.register_components([transform])
self.transforms[transform.get_id] = transform
def add_client(self, client):
assert isinstance(client, TradeSimulationClient)
self.check_started()
self.sim.register_components([client])
self.clients[client.get_id] = client
def check_started(self):
if self.started:
raise ZiplineException("TradeSimulation", "You cannot add \
components after the simulation has begun.")
def get_cumulative_performance(self):
return self.trading_client.perf.cumulative_performance.to_dict()
def allocate_sockets(self, n):
"""
Allocate sockets local to this line, track them so
we can gc after test run.
"""
assert isinstance(n, int)
assert n > 0
leased = self.allocator.lease(n)
self.leased_sockets.extend(leased)
return leased
@property
def components(self):
"""
Return the component instances inside of this topology
"""
base = set(self.sim.components.values())
transforms = set(self.transforms.values())
sources = set(self.sources.values())
return base | transforms | sources
@property
def topology(self):
"""
Returns the Component names in the topology of the
backtest.
"""
# A complete topology is the union of three classes of
# components added individually to the simulation client
# at various places.
#
# base : ['FEED', 'MERGE', 'TRADING_CLIENT', 'PASSTHROUGH']
# transforms : ['vwap__01', ... ]
# sources : ['MongoTradeHistory', ... ]
base = set(self.sim.components.keys())
transforms = set(self.transforms.keys())
sources = set(self.sources.keys())
return base | transforms | sources
def setup_monitor(self):
"""
Prepare the monitor to manage the topology specified
by this line.
"""
self.monitor.manage(self.topology)
def simulate(self, blocking=True):
self.setup_monitor()
self.started = True
self.sim_context = self.sim.simulate()
# If we're using a threaded simulator block on the pool
# of thread since we're only ever in a test and we don't
# generally monitor the state of the system as a hold at
# the supervisory layer
# TODO: better way of identifying concurrency substrate
if blocking:
for process in self.sim.subprocesses:
process.join()
@property
def is_success(self):
# TODO: other assertions?
if self.sim.did_clean_shutdown():
return True
else:
return False
#--------------------------------
# Component property accessors
#--------------------------------
def get_positions(self):
"""
returns current positions as a dict. draws from the cumulative
performance period in the performance tracker.
"""
perf = self.trading_client.perf.cumulative_performance
positions = perf.get_positions()
return positions
class ZiplineException(Exception):
def __init__(self, zipline_name, msg):
+16 -33
View File
@@ -12,7 +12,9 @@ from datetime import datetime, timedelta
import zipline.finance.risk as risk
import zipline.protocol as zp
from zipline.finance.sources import SpecificEquityTrades, RandomEquityTrades
from zipline.finance.sources import RandomEquityTrades
from zipline.gens.tradegens import SpecificEquityTrades
from zipline.gens.utils import create_trade
from zipline.finance.trading import TradingEnvironment
# TODO
@@ -69,16 +71,6 @@ def create_trading_environment(year=2006):
return trading_environment
def create_trade(sid, price, amount, datetime, source_id = "test_factory"):
row = zp.ndict({
'source_id' : source_id,
'type' : zp.DATASOURCE_TYPE.TRADE,
'sid' : sid,
'dt' : datetime,
'price' : price,
'volume' : amount
})
return row
def get_next_trading_dt(current, interval, trading_calendar):
next = current
@@ -220,29 +212,20 @@ def create_minutely_trade_source(sids, trade_count, trading_environment):
)
def create_trade_source(sids, trade_count, trade_time_increment, trading_environment):
trade_history = []
price = [10.1] * trade_count
volume = [100] * trade_count
#Set up source a. One minute between events.
args = tuple()
kwargs = {
'count' : trade_count,
'sids' : sids,
'start' : trading_environment.first_open,
'delta' : trade_time_increment,
'filter' : sids
}
source = SpecificEquityTrades(*args, **kwargs)
for sid in sids:
start_date = trading_environment.first_open
# TODO: do we need to set the trading environment's end to same dt as
# the last trade in the history?
#trading_environment.period_end = trade_history[-1].dt
generated_trades = create_trade_history(
sid,
price,
volume,
trade_time_increment,
trading_environment
)
trade_history.extend(generated_trades)
trade_history = sorted(trade_history, key=attrgetter('dt'))
#set the trading environment's end to same dt as the last trade in the
#history.
trading_environment.period_end = trade_history[-1].dt
source = SpecificEquityTrades(trade_history)
return source