From 06dc6f7acb84cb4e73440bb39a5bdf762997ab12 Mon Sep 17 00:00:00 2001 From: fawce Date: Mon, 6 Aug 2012 13:11:20 -0400 Subject: [PATCH] beginning refactor to use single threaded simulator. --- zipline/__init__.py | 6 - zipline/gens/examples.py | 16 +-- zipline/gens/tradegens.py | 3 +- zipline/gens/utils.py | 12 ++ zipline/lines.py | 272 ++++++-------------------------------- zipline/utils/factory.py | 49 +++---- 6 files changed, 76 insertions(+), 282 deletions(-) diff --git a/zipline/__init__.py b/zipline/__init__.py index a84cd345..31272fcb 100644 --- a/zipline/__init__.py +++ b/zipline/__init__.py @@ -6,15 +6,9 @@ Zipline # it is a place to expose the public interfaces. import protocol # namespace -from core.monitor import Monitor -from lines import SimulatedTrading -from core.host import ComponentHost from utils.protocol_utils import ndict __all__ = [ - SimulatedTrading, - Monitor, - ComponentHost, protocol, ndict ] diff --git a/zipline/gens/examples.py b/zipline/gens/examples.py index a6a95f59..2f003230 100644 --- a/zipline/gens/examples.py +++ b/zipline/gens/examples.py @@ -1,6 +1,5 @@ import pytz -from time import sleep from pprint import pprint as pp from datetime import datetime, timedelta @@ -16,7 +15,7 @@ from zipline.gens.tradesimulation import TradeSimulationClient as tsc import zipline.protocol as zp if __name__ == "__main__": - + filter = [2,3] #Set up source a. One minute between events. args_a = tuple() @@ -42,19 +41,18 @@ if __name__ == "__main__": #Set up source c. Three minutes between events. - sorted = date_sorted_sources(source_a, source_b) - + sorted = date_sorted_sources(source_a, source_b) + passthrough = StatefulTransform(Passthrough) mavg_price = StatefulTransform(MovingAverage, timedelta(minutes = 20), ['price']) - + merged = merged_transforms(sorted, passthrough, mavg_price) - + algo = TestAlgorithm(2, 10, 100, sid_filter = [2,3]) environment = create_trading_environment(year = 2012) style = zp.SIMULATION_STYLE.FIXED_SLIPPAGE - + trading_client = tsc(algo, environment, style) - + for message in trading_client.simulate(merged): pp(message) - diff --git a/zipline/gens/tradegens.py b/zipline/gens/tradegens.py index b1a0ed96..5c1bacd7 100644 --- a/zipline/gens/tradegens.py +++ b/zipline/gens/tradegens.py @@ -6,8 +6,7 @@ import random from itertools import chain, cycle, ifilter, izip from datetime import datetime, timedelta -from zipline.utils.factory import create_trade -from zipline.gens.utils import hash_args +from zipline.gens.utils import hash_args, create_trade def date_gen(start = datetime(2006, 6, 6, 12), delta = timedelta(minutes = 1), diff --git a/zipline/gens/utils.py b/zipline/gens/utils.py index 071ce5fc..45e31de4 100644 --- a/zipline/gens/utils.py +++ b/zipline/gens/utils.py @@ -66,6 +66,18 @@ def hash_args(*args, **kwargs): hasher.update(combined) return hasher.hexdigest() +def create_trade(sid, price, amount, datetime, source_id = "test_factory"): + row = ndict({ + 'source_id' : source_id, + 'type' : DATASOURCE_TYPE.TRADE, + 'sid' : sid, + 'dt' : datetime, + 'price' : price, + 'volume' : amount + }) + return row + + def assert_datasource_protocol(event): """Assert that an event meets the protocol for datasource outputs.""" diff --git a/zipline/lines.py b/zipline/lines.py index a5a3858e..fbc4d155 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -60,110 +60,41 @@ before invoking simulate. +---------------------------------+ """ -import inspect import logbook -#import zipline.utils.factory as factory - -from zipline.components import DataSource -from zipline.transforms import BaseTransform from zipline.test_algorithms import TestAlgorithm -from zipline.components import TradeSimulationClient -from zipline.core.process import ProcessSimulator -from zipline.core.monitor import Monitor from zipline.finance.trading import SIMULATION_STYLE +from zipline.utils import factory +import pytz + +from pprint import pprint as pp +from datetime import datetime, timedelta + +from zipline.utils.factory import create_trading_environment +from zipline.test_algorithms import TestAlgorithm + +from zipline.gens.composites import SourceBundle, TransformBundle, \ + date_sorted_sources, merged_transforms +from zipline.gens.tradegens import SpecificEquityTrades +from zipline.gens.transform import MovingAverage, Passthrough, StatefulTransform +from zipline.gens.tradesimulation import TradeSimulationClient as tsc + +import zipline.protocol as zp + log = logbook.Logger('Lines') class SimulatedTrading(object): - """ - Zipline with:: - - _no_ data sources. - - Trade simulation client, which is available to send callbacks on - events and also accept orders to be simulated. - - An order data source, which will receive orders from the trade - simulation client, and feed them into the event stream to be - serialized and order alongside all other data source events. - - transaction simulation transformation, which receives the order - events and estimates a theoretical execution price and volume. + @staticmethod + def create_simulation(sources, transforms, algorithm, environment, style): - All components in this zipline are subject to heartbeat checks and - a control monitor, which can kill the entire zipline in the event of - exceptions in one of the components or an external request to end the - simulation. - """ + sorted = date_sorted_sources(*sources) + passthrough = StatefulTransform(Passthrough) - def __init__(self, **config): - """ - :param config: a dict with the following required properties:: - - - algorithm: a class that follows the algorithm protocol. See - :py:meth:`zipline.finance.trading.TradeSimulationClient.add_algorithm - for details. - - trading_environment: an instance of - :py:class:`zipline.trading.TradingEnvironment` - - allocator: an instance of - :py:class:`zipline.simulator.AddressAllocator` - - simulation_style: optional parameter that configures the - :py:class:`zipline.finance.trading.TransactionSimulator`. Expects - a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading` - """ - assert isinstance(config, dict) - self.algorithm = config['algorithm'] - self.allocator = config['allocator'] - self.trading_environment = config['trading_environment'] - self.sim_style = config.get('simulation_style') - self.send_sighup = config.get('send_sighup', False) - - - self.leased_sockets = [] - self.sim_context = None - - sockets = self.allocate_sockets(7) - addresses = { - 'sync_address' : sockets[0], - 'data_address' : sockets[1], - 'feed_address' : sockets[2], - 'merge_address' : sockets[3], - # TODO: this refers to the results of the merge, a - # horribly confusing name for the socket. - 'results_address' : sockets[4], - } - - self.monitor = Monitor( - # pub socket - sockets[5], - # route socket - sockets[6], - # exception socket to match tradesimclient's result - # socket, because we want to relay exceptions to the - # same listener - config['results_socket'], - send_sighup=self.send_sighup - ) - - self.started = False - - self.sim = ProcessSimulator(addresses) - - self.clients = {} - - self.trading_client = TradeSimulationClient( - self.trading_environment, - self.sim_style, - config['results_socket'], - self.algorithm - ) - self.add_client(self.trading_client) - - # setup all sources - self.sources = {} - - #setup transforms - self.transforms = {} - - self.sim.register_monitor( self.monitor ) + merged = merged_transforms(sorted, passthrough, *transforms) + trading_client = tsc(algorithm, environment, style) + return trading_client.simluate(merged) @staticmethod @@ -173,7 +104,6 @@ class SimulatedTrading(object): - environment - a \ :py:class:`zipline.finance.trading.TradingEnvironment` - - allocator - a :py:class:`zipline.simulator.AddressAllocator` - sid - an integer, which will be used as the security ID. - order_count - the number of orders the test algo will place, defaults to 100 @@ -188,10 +118,11 @@ class SimulatedTrading(object): - simulation_style: optional parameter that configures the :py:class:`zipline.finance.trading.TransactionSimulator`. Expects a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading` + - transforms: optional parameter that provides a list + of StatefulTransform objects. """ assert isinstance(config, dict) - allocator = config['allocator'] sid = config['sid'] #-------------------- @@ -236,6 +167,12 @@ class SimulatedTrading(object): trade_count, trading_environment ) + + #------------------- + # Transforms + #------------------- + transforms = config.get('transforms', []) + #------------------- # Create the Algo #------------------- @@ -248,149 +185,20 @@ class SimulatedTrading(object): order_count ) - if config.has_key('results_socket'): - results_socket = config['results_socket'] - else: - results_socket = None #------------------- # Simulation #------------------- - zipline = SimulatedTrading(**{ - 'algorithm' : test_algo, - 'trading_environment' : trading_environment, - 'allocator' : allocator, - 'simulation_style' : simulation_style, - 'results_socket' : results_socket, - }) + + sim = SimulatedTrading.create_simulation( + [trade_source], + transforms, + test_algo, + trading_environment, + simulation_style) #------------------- - zipline.add_source(trade_source) + return sim - return zipline - - def add_source(self, source): - """ - Adds the source to the zipline, sets the sid filter of the - source to the algorithm's sid filter. - """ - assert isinstance(source, DataSource) - self.check_started() - source.set_filter('sid', self.algorithm.get_sid_filter()) - self.sim.register_components([source]) - - # ``id`` is name of source_id, ``get_id`` is the class name - self.sources[source.get_id] = source - - def add_transform(self, transform): - assert isinstance(transform, BaseTransform) - self.check_started() - self.sim.register_components([transform]) - self.transforms[transform.get_id] = transform - - def add_client(self, client): - assert isinstance(client, TradeSimulationClient) - self.check_started() - self.sim.register_components([client]) - self.clients[client.get_id] = client - - def check_started(self): - if self.started: - raise ZiplineException("TradeSimulation", "You cannot add \ - components after the simulation has begun.") - - def get_cumulative_performance(self): - return self.trading_client.perf.cumulative_performance.to_dict() - - def allocate_sockets(self, n): - """ - Allocate sockets local to this line, track them so - we can gc after test run. - """ - - assert isinstance(n, int) - assert n > 0 - - leased = self.allocator.lease(n) - self.leased_sockets.extend(leased) - - return leased - - @property - def components(self): - """ - Return the component instances inside of this topology - """ - - base = set(self.sim.components.values()) - transforms = set(self.transforms.values()) - sources = set(self.sources.values()) - - return base | transforms | sources - - @property - def topology(self): - """ - Returns the Component names in the topology of the - backtest. - """ - - # A complete topology is the union of three classes of - # components added individually to the simulation client - # at various places. - # - # base : ['FEED', 'MERGE', 'TRADING_CLIENT', 'PASSTHROUGH'] - # transforms : ['vwap__01', ... ] - # sources : ['MongoTradeHistory', ... ] - - base = set(self.sim.components.keys()) - transforms = set(self.transforms.keys()) - sources = set(self.sources.keys()) - - return base | transforms | sources - - def setup_monitor(self): - """ - Prepare the monitor to manage the topology specified - by this line. - """ - self.monitor.manage(self.topology) - - def simulate(self, blocking=True): - self.setup_monitor() - - self.started = True - self.sim_context = self.sim.simulate() - - # If we're using a threaded simulator block on the pool - # of thread since we're only ever in a test and we don't - # generally monitor the state of the system as a hold at - # the supervisory layer - - # TODO: better way of identifying concurrency substrate - if blocking: - for process in self.sim.subprocesses: - process.join() - - @property - def is_success(self): - # TODO: other assertions? - if self.sim.did_clean_shutdown(): - return True - else: - return False - - #-------------------------------- - # Component property accessors - #-------------------------------- - - def get_positions(self): - """ - returns current positions as a dict. draws from the cumulative - performance period in the performance tracker. - """ - perf = self.trading_client.perf.cumulative_performance - positions = perf.get_positions() - return positions class ZiplineException(Exception): def __init__(self, zipline_name, msg): diff --git a/zipline/utils/factory.py b/zipline/utils/factory.py index 004f542a..e001cf07 100644 --- a/zipline/utils/factory.py +++ b/zipline/utils/factory.py @@ -12,7 +12,9 @@ from datetime import datetime, timedelta import zipline.finance.risk as risk import zipline.protocol as zp -from zipline.finance.sources import SpecificEquityTrades, RandomEquityTrades +from zipline.finance.sources import RandomEquityTrades +from zipline.gens.tradegens import SpecificEquityTrades +from zipline.gens.utils import create_trade from zipline.finance.trading import TradingEnvironment # TODO @@ -69,16 +71,6 @@ def create_trading_environment(year=2006): return trading_environment -def create_trade(sid, price, amount, datetime, source_id = "test_factory"): - row = zp.ndict({ - 'source_id' : source_id, - 'type' : zp.DATASOURCE_TYPE.TRADE, - 'sid' : sid, - 'dt' : datetime, - 'price' : price, - 'volume' : amount - }) - return row def get_next_trading_dt(current, interval, trading_calendar): next = current @@ -220,29 +212,20 @@ def create_minutely_trade_source(sids, trade_count, trading_environment): ) def create_trade_source(sids, trade_count, trade_time_increment, trading_environment): - trade_history = [] - price = [10.1] * trade_count - volume = [100] * trade_count + #Set up source a. One minute between events. + args = tuple() + kwargs = { + 'count' : trade_count, + 'sids' : sids, + 'start' : trading_environment.first_open, + 'delta' : trade_time_increment, + 'filter' : sids + } + source = SpecificEquityTrades(*args, **kwargs) - for sid in sids: - start_date = trading_environment.first_open + # TODO: do we need to set the trading environment's end to same dt as + # the last trade in the history? + #trading_environment.period_end = trade_history[-1].dt - generated_trades = create_trade_history( - sid, - price, - volume, - trade_time_increment, - trading_environment - ) - - trade_history.extend(generated_trades) - - trade_history = sorted(trade_history, key=attrgetter('dt')) - - #set the trading environment's end to same dt as the last trade in the - #history. - trading_environment.period_end = trade_history[-1].dt - - source = SpecificEquityTrades(trade_history) return source