From fade9758a80dfd0aca05425aaaf37880abbe6e30 Mon Sep 17 00:00:00 2001 From: Frederic Fortier Date: Tue, 15 Aug 2017 01:37:35 -0400 Subject: [PATCH] Trying to run a live algo --- catalyst/examples/buy_and_hold_live.py | 28 ++ catalyst/examples/buy_and_hold_test.py | 137 ++++++++ catalyst/exchange/exchange_clock.py | 4 +- catalyst/utils/run_algo.py | 53 +++- catalyst/utils/run_algo_original.py | 415 +++++++++++++++++++++++++ tests/exchange/test_clock.py | 13 +- 6 files changed, 629 insertions(+), 21 deletions(-) create mode 100644 catalyst/examples/buy_and_hold_live.py create mode 100644 catalyst/examples/buy_and_hold_test.py create mode 100644 catalyst/utils/run_algo_original.py diff --git a/catalyst/examples/buy_and_hold_live.py b/catalyst/examples/buy_and_hold_live.py new file mode 100644 index 00000000..e0016d86 --- /dev/null +++ b/catalyst/examples/buy_and_hold_live.py @@ -0,0 +1,28 @@ +# code +from catalyst.api import order, record, symbol +from catalyst.exchange.algorithm_exchange import ExchangeTradingAlgorithm +from datetime import timedelta +from catalyst.exchange.bitfinex import Bitfinex +import pandas as pd + +bitfinex = Bitfinex() + + +def initialize(context): + pass + + +def handle_data(context, data): + asset = bitfinex.get_asset('eth_usd') + test = data.current(asset, 'close') + order(symbol('AAPL'), 10) + + +algo_obj = ExchangeTradingAlgorithm( + initialize=initialize, + handle_data=handle_data, + start=pd.Timestamp.utcnow(), + end=pd.Timestamp.utcnow() + timedelta(hours=1), + exchange=bitfinex, +) +perf_manual = algo_obj.run() diff --git a/catalyst/examples/buy_and_hold_test.py b/catalyst/examples/buy_and_hold_test.py new file mode 100644 index 00000000..812d31e7 --- /dev/null +++ b/catalyst/examples/buy_and_hold_test.py @@ -0,0 +1,137 @@ +# code +import os +import re +from catalyst.api import order, record, symbol +from catalyst.exchange.algorithm_exchange import ExchangeTradingAlgorithm +from datetime import timedelta +from catalyst.exchange.bitfinex import Bitfinex +import pandas as pd +from catalyst.api import ( + order_target_value, + symbol, + record, + cancel_order, + get_open_orders, +) +from catalyst.algorithm import TradingAlgorithm +from catalyst.data.bundles.core import load +from catalyst.data.data_portal import DataPortal +from catalyst.data.loader import load_crypto_market_data +from catalyst.finance.trading import TradingEnvironment +from catalyst.pipeline.data import USEquityPricing, CryptoPricing +from catalyst.pipeline.loaders import ( + USEquityPricingLoader, + CryptoPricingLoader, +) +from catalyst.utils.calendars import get_calendar +from functools import partial + +bitfinex = Bitfinex() + + +def initialize(context): + context.ASSET_NAME = 'USDT_BTC' + context.TARGET_HODL_RATIO = 0.8 + context.RESERVE_RATIO = 1.0 - context.TARGET_HODL_RATIO + + # For all trading pairs in the poloniex bundle, the default denomination + # currently supported by Catalyst is 1/1000th of a full coin. Use this + # constant to scale the price of up to that of a full coin if desired. + context.TICK_SIZE = 1000.0 + + context.is_buying = True + context.asset = symbol(context.ASSET_NAME) + + context.i = 0 + pass + + +def handle_data(context, data): + context.i += 1 + + print 'i:', context.i + + starting_cash = context.portfolio.starting_cash + target_hodl_value = context.TARGET_HODL_RATIO * starting_cash + reserve_value = context.RESERVE_RATIO * starting_cash + + # Cancel any outstanding orders + orders = get_open_orders(context.asset) or [] + for order in orders: + cancel_order(order) + + # Stop buying after passing the reserve threshold + cash = context.portfolio.cash + if cash <= reserve_value: + context.is_buying = False + + # Retrieve current asset price from pricing data + price = data[context.asset].price + + # Check if still buying and could (approximately) afford another purchase + if context.is_buying and cash > price: + # Place order to make position in asset equal to target_hodl_value + order_target_value( + context.asset, + target_hodl_value, + limit_price=price * 1.1, + stop_price=price * 0.9, + ) + + record( + price=price, + cash=cash, + starting_cash=context.portfolio.starting_cash, + leverage=context.account.leverage, + ) + + +b = 'poloniex' +bundle_data = load( + b, + os.environ, + pd.Timestamp.utcnow() - timedelta(days=10), +) + +prefix, connstr = re.split( + r'sqlite:///', + str(bundle_data.asset_finder.engine.url), + maxsplit=1, +) +if prefix: + raise ValueError( + "invalid url %r, must begin with 'sqlite:///'" % + str(bundle_data.asset_finder.engine.url), + ) + +open_calendar = get_calendar('OPEN') + +env = TradingEnvironment( + load=partial(load_crypto_market_data, environ=os.environ), + bm_symbol='USDT_BTC', + trading_calendar=open_calendar, + asset_db_path=connstr, + environ=os.environ, +) + +first_trading_day = pd.Timestamp.utcnow() - timedelta(days=10) + +data = DataPortal( + env.asset_finder, + open_calendar, + first_trading_day=first_trading_day, + minute_reader=bundle_data.minute_bar_reader, + five_minute_reader=bundle_data.five_minute_bar_reader, + daily_reader=bundle_data.daily_bar_reader, + adjustment_reader=bundle_data.adjustment_reader, +) + +algo_obj = ExchangeTradingAlgorithm( + initialize=initialize, + handle_data=handle_data, + start=first_trading_day, + end=pd.Timestamp.utcnow() - timedelta(days=1), + exchange=bitfinex, +) + +perf_manual = algo_obj.run(data, overwrite_sim_params=False) diff --git a/catalyst/exchange/exchange_clock.py b/catalyst/exchange/exchange_clock.py index 4dee0586..f3866ce8 100644 --- a/catalyst/exchange/exchange_clock.py +++ b/catalyst/exchange/exchange_clock.py @@ -46,6 +46,7 @@ class ExchangeClock(object): before_trading_start_minutes=None, minute_emission=False, time_skew=pd.Timedelta("0s")): + self.sessions = sessions self.execution_opens = execution_opens self.execution_closes = execution_closes @@ -53,7 +54,7 @@ class ExchangeClock(object): self.minute_emission = minute_emission self.time_skew = time_skew self._last_emit = None - self._before_trading_start_bar_yielded = False + self._before_trading_start_bar_yielded = True def __iter__(self): yield pd.Timestamp.utcnow(), SESSION_START @@ -64,6 +65,7 @@ class ExchangeClock(object): if self._last_emit is None or server_time > self._last_emit: + print 'emitting bar %s' % server_time self._last_emit = server_time yield server_time, BAR diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index d63c314a..faa5ddd5 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -4,6 +4,10 @@ from runpy import run_path import sys import warnings + +import pandas as pd + + import click try: from pygments import highlight @@ -29,6 +33,9 @@ from catalyst.utils.calendars import get_calendar from catalyst.utils.factory import create_simulation_parameters import catalyst.utils.paths as pth +from catalyst.exchange.algorithm_exchange import ExchangeTradingAlgorithm +from catalyst.exchange.data_portal_exchange import DataPortalExchange + class _RunAlgoError(click.ClickException, ValueError): """Signal an error that should have a different message if invoked from @@ -68,7 +75,8 @@ def _run(handle_data, output, print_algo, local_namespace, - environ): + environ, + exchange): """Run a backtest for the given algorithm. This is shared between the cli and :func:`catalyst.run_algo`. @@ -87,7 +95,7 @@ def _run(handle_data, raise ValueError( 'invalid define %r, should be of the form name=value' % assign, - ) + ) try: # evaluate in the same namespace so names may refer to # eachother @@ -95,7 +103,7 @@ def _run(handle_data, except Exception as e: raise ValueError( 'failed to execute definition for name %r: %s' % (name, e), - ) + ) elif defines: raise _RunAlgoError( 'cannot pass define without `algotext`', @@ -144,7 +152,7 @@ def _run(handle_data, raise ValueError( "invalid url %r, must begin with 'sqlite:///'" % str(bundle_data.asset_finder.engine.url), - ) + ) open_calendar = get_calendar('OPEN') @@ -158,7 +166,10 @@ def _run(handle_data, first_trading_day = bundle_data.minute_bar_reader.first_trading_day - data = DataPortal( + DataPortalClass = (partial(DataPortalExchange, exchange) + if exchange + else DataPortal) + data = DataPortalClass( env.asset_finder, open_calendar, first_trading_day=first_trading_day, @@ -179,16 +190,16 @@ def _run(handle_data, if b == 'poloniex': return CryptoPricingLoader( - bundle_data, - data_frequency, - CryptoPricing, - ) + bundle_data, + data_frequency, + CryptoPricing, + ) elif b == 'quandl': return USEquityPricingLoader( - bundle_data, - data_frequency, - USEquityPricing, - ) + bundle_data, + data_frequency, + USEquityPricing, + ) raise ValueError( "No PipelineLoader registered for bundle %s." % b ) @@ -208,7 +219,14 @@ def _run(handle_data, env = TradingEnvironment(environ=environ) choose_loader = None - perf = TradingAlgorithm( + if exchange: + start = pd.Timestamp.utcnow() + end = start + pd.Timedelta('1', 'D') + + TradingAlgorithmClass = (partial(ExchangeTradingAlgorithm, exchange=exchange) + if exchange else TradingAlgorithm) + + perf = TradingAlgorithmClass( namespace=namespace, env=env, get_pipeline_loader=choose_loader, @@ -308,7 +326,9 @@ def run_algorithm(start, default_extension=True, extensions=(), strict_extensions=True, - environ=os.environ): + environ=os.environ, + live_trading=False, + tws_uri=None): """Run a trading algorithm. Parameters @@ -386,7 +406,7 @@ def run_algorithm(start, raise ValueError( 'must specify one of `data`, `data_portal`, or `bundle`,' ' got: %r' % non_none_data, - ) + ) elif 'bundle' not in non_none_data and bundle_timestamp is not None: raise ValueError( @@ -412,4 +432,5 @@ def run_algorithm(start, print_algo=False, local_namespace=False, environ=environ, + exchange=None, ) diff --git a/catalyst/utils/run_algo_original.py b/catalyst/utils/run_algo_original.py new file mode 100644 index 00000000..ade8dbf5 --- /dev/null +++ b/catalyst/utils/run_algo_original.py @@ -0,0 +1,415 @@ +import os +import re +from runpy import run_path +import sys +import warnings + +import click +try: + from pygments import highlight + from pygments.lexers import PythonLexer + from pygments.formatters import TerminalFormatter + PYGMENTS = True +except: + PYGMENTS = False +from toolz import valfilter, concatv +from functools import partial + +from catalyst.algorithm import TradingAlgorithm +from catalyst.data.bundles.core import load +from catalyst.data.data_portal import DataPortal +from catalyst.data.loader import load_crypto_market_data +from catalyst.finance.trading import TradingEnvironment +from catalyst.pipeline.data import USEquityPricing, CryptoPricing +from catalyst.pipeline.loaders import ( + USEquityPricingLoader, + CryptoPricingLoader, +) +from catalyst.utils.calendars import get_calendar +from catalyst.utils.factory import create_simulation_parameters +import catalyst.utils.paths as pth + + +class _RunAlgoError(click.ClickException, ValueError): + """Signal an error that should have a different message if invoked from + the cli. + + Parameters + ---------- + pyfunc_msg : str + The message that will be shown when called as a python function. + cmdline_msg : str + The message that will be shown on the command line. + """ + exit_code = 1 + + def __init__(self, pyfunc_msg, cmdline_msg): + super(_RunAlgoError, self).__init__(cmdline_msg) + self.pyfunc_msg = pyfunc_msg + + def __str__(self): + return self.pyfunc_msg + + +def _run(handle_data, + initialize, + before_trading_start, + analyze, + algofile, + algotext, + defines, + data_frequency, + capital_base, + data, + bundle, + bundle_timestamp, + start, + end, + output, + print_algo, + local_namespace, + environ): + """Run a backtest for the given algorithm. + + This is shared between the cli and :func:`catalyst.run_algo`. + """ + if algotext is not None: + if local_namespace: + ip = get_ipython() # noqa + namespace = ip.user_ns + else: + namespace = {} + + for assign in defines: + try: + name, value = assign.split('=', 2) + except ValueError: + raise ValueError( + 'invalid define %r, should be of the form name=value' % + assign, + ) + try: + # evaluate in the same namespace so names may refer to + # eachother + namespace[name] = eval(value, namespace) + except Exception as e: + raise ValueError( + 'failed to execute definition for name %r: %s' % (name, e), + ) + elif defines: + raise _RunAlgoError( + 'cannot pass define without `algotext`', + "cannot pass '-D' / '--define' without '-t' / '--algotext'", + ) + else: + namespace = {} + if algofile is not None: + algotext = algofile.read() + + if print_algo: + if PYGMENTS: + highlight( + algotext, + PythonLexer(), + TerminalFormatter(), + outfile=sys.stdout, + ) + else: + click.echo(algotext) + + if bundle is not None: + bundles = bundle.split(',') + + def get_trading_env_and_data(bundles): + env = data = None + + b = 'poloniex' + if len(bundles) == 0: + return env, data + elif len(bundles) == 1: + b = bundles[0] + + bundle_data = load( + b, + environ, + bundle_timestamp, + ) + + prefix, connstr = re.split( + r'sqlite:///', + str(bundle_data.asset_finder.engine.url), + maxsplit=1, + ) + if prefix: + raise ValueError( + "invalid url %r, must begin with 'sqlite:///'" % + str(bundle_data.asset_finder.engine.url), + ) + + open_calendar = get_calendar('OPEN') + + env = TradingEnvironment( + load=partial(load_crypto_market_data, environ=environ), + bm_symbol='USDT_BTC', + trading_calendar=open_calendar, + asset_db_path=connstr, + environ=environ, + ) + + first_trading_day = bundle_data.minute_bar_reader.first_trading_day + + data = DataPortal( + env.asset_finder, + open_calendar, + first_trading_day=first_trading_day, + minute_reader=bundle_data.minute_bar_reader, + five_minute_reader=bundle_data.five_minute_bar_reader, + daily_reader=bundle_data.daily_bar_reader, + adjustment_reader=bundle_data.adjustment_reader, + ) + + return env, data + + def get_loader_for_bundle(b): + bundle_data = load( + b, + environ, + bundle_timestamp, + ) + + if b == 'poloniex': + return CryptoPricingLoader( + bundle_data, + data_frequency, + CryptoPricing, + ) + elif b == 'quandl': + return USEquityPricingLoader( + bundle_data, + data_frequency, + USEquityPricing, + ) + raise ValueError( + "No PipelineLoader registered for bundle %s." % b + ) + + loaders = [get_loader_for_bundle(b) for b in bundles] + env, data = get_trading_env_and_data(bundles) + + def choose_loader(column): + for loader in loaders: + if column in loader.columns: + return loader + raise ValueError( + "No PipelineLoader registered for column %s." % column + ) + + else: + env = TradingEnvironment(environ=environ) + choose_loader = None + + perf = TradingAlgorithm( + namespace=namespace, + env=env, + get_pipeline_loader=choose_loader, + sim_params=create_simulation_parameters( + start=start, + end=end, + capital_base=capital_base, + data_frequency=data_frequency, + emission_rate=data_frequency, + ), + **{ + 'initialize': initialize, + 'handle_data': handle_data, + 'before_trading_start': before_trading_start, + 'analyze': analyze, + } if algotext is None else { + 'algo_filename': getattr(algofile, 'name', ''), + 'script': algotext, + } + ).run( + data, + overwrite_sim_params=False, + ) + + if output == '-': + click.echo(str(perf)) + elif output != os.devnull: # make the catalyst magic not write any data + perf.to_pickle(output) + + return perf + + +# All of the loaded extensions. We don't want to load an extension twice. +_loaded_extensions = set() + + +def load_extensions(default, extensions, strict, environ, reload=False): + """Load all of the given extensions. This should be called by run_algo + or the cli. + + Parameters + ---------- + default : bool + Load the default exension (~/.catalyst/extension.py)? + extension : iterable[str] + The paths to the extensions to load. If the path ends in ``.py`` it is + treated as a script and executed. If it does not end in ``.py`` it is + treated as a module to be imported. + strict : bool + Should failure to load an extension raise. If this is false it will + still warn. + environ : mapping + The environment to use to find the default extension path. + reload : bool, optional + Reload any extensions that have already been loaded. + """ + if default: + default_extension_path = pth.default_extension(environ=environ) + pth.ensure_file(default_extension_path) + # put the default extension first so other extensions can depend on + # the order they are loaded + extensions = concatv([default_extension_path], extensions) + + for ext in extensions: + if ext in _loaded_extensions and not reload: + continue + try: + # load all of the catalyst extensionss + if ext.endswith('.py'): + run_path(ext, run_name='') + else: + __import__(ext) + except Exception as e: + if strict: + # if `strict` we should raise the actual exception and fail + raise + # without `strict` we should just log the failure + warnings.warn( + 'Failed to load extension: %r\n%s' % (ext, e), + stacklevel=2 + ) + else: + _loaded_extensions.add(ext) + + +def run_algorithm(start, + end, + initialize, + capital_base, + handle_data=None, + before_trading_start=None, + analyze=None, + data_frequency='daily', + data=None, + bundle=None, + bundle_timestamp=None, + default_extension=True, + extensions=(), + strict_extensions=True, + environ=os.environ): + """Run a trading algorithm. + + Parameters + ---------- + start : datetime + The start date of the backtest. + end : datetime + The end date of the backtest.. + initialize : callable[context -> None] + The initialize function to use for the algorithm. This is called once + at the very begining of the backtest and should be used to set up + any state needed by the algorithm. + capital_base : float + The starting capital for the backtest. + handle_data : callable[(context, BarData) -> None], optional + The handle_data function to use for the algorithm. This is called + every minute when ``data_frequency == 'minute'`` or every day + when ``data_frequency == 'daily'``. + before_trading_start : callable[(context, BarData) -> None], optional + The before_trading_start function for the algorithm. This is called + once before each trading day (after initialize on the first day). + analyze : callable[(context, pd.DataFrame) -> None], optional + The analyze function to use for the algorithm. This function is called + once at the end of the backtest and is passed the context and the + performance data. + data_frequency : {'daily', 'minute'}, optional + The data frequency to run the algorithm at. + data : pd.DataFrame, pd.Panel, or DataPortal, optional + The ohlcv data to run the backtest with. + This argument is mutually exclusive with: + ``bundle`` + ``bundle_timestamp`` + bundle : str, optional + The name of the data bundle to use to load the data to run the backtest + with. This defaults to 'quantopian-quandl'. + This argument is mutually exclusive with ``data``. + bundle_timestamp : datetime, optional + The datetime to lookup the bundle data for. This defaults to the + current time. + This argument is mutually exclusive with ``data``. + default_extension : bool, optional + Should the default catalyst extension be loaded. This is found at + ``$ZIPLINE_ROOT/extension.py`` + extensions : iterable[str], optional + The names of any other extensions to load. Each element may either be + a dotted module path like ``a.b.c`` or a path to a python file ending + in ``.py`` like ``a/b/c.py``. + strict_extensions : bool, optional + Should the run fail if any extensions fail to load. If this is false, + a warning will be raised instead. + environ : mapping[str -> str], optional + The os environment to use. Many extensions use this to get parameters. + This defaults to ``os.environ``. + + Returns + ------- + perf : pd.DataFrame + The daily performance of the algorithm. + + See Also + -------- + catalyst.data.bundles.bundles : The available data bundles. + """ + load_extensions(default_extension, extensions, strict_extensions, environ) + + non_none_data = valfilter(bool, { + 'data': data is not None, + 'bundle': bundle is not None, + }) + if not non_none_data: + # if neither data nor bundle are passed use 'quantopian-quandl' + bundle = 'quantopian-quandl' + + elif len(non_none_data) != 1: + raise ValueError( + 'must specify one of `data`, `data_portal`, or `bundle`,' + ' got: %r' % non_none_data, + ) + + elif 'bundle' not in non_none_data and bundle_timestamp is not None: + raise ValueError( + 'cannot specify `bundle_timestamp` without passing `bundle`', + ) + + return _run( + handle_data=handle_data, + initialize=initialize, + before_trading_start=before_trading_start, + analyze=analyze, + algofile=None, + algotext=None, + defines=(), + data_frequency=data_frequency, + capital_base=capital_base, + data=data, + bundle=bundle, + bundle_timestamp=bundle_timestamp, + start=start, + end=end, + output=os.devnull, + print_algo=False, + local_namespace=False, + environ=environ, + ) diff --git a/tests/exchange/test_clock.py b/tests/exchange/test_clock.py index eeafc8ac..94414d27 100644 --- a/tests/exchange/test_clock.py +++ b/tests/exchange/test_clock.py @@ -8,10 +8,16 @@ from collections import defaultdict from catalyst.utils.calendars import get_calendar import pandas as pd -log = Logger('BitfinexTestCase') +log = Logger('ExchangeClockTestCase') -class BitfinexTestCase(TestCase): +class ExchangeClockTestCase(TestCase): + @classmethod + def setUpClass(cls): + cls.open_calendar = get_calendar("OPEN") + + cls.sessions = pd.Timestamp.utcnow() + def setUp(self): self.internal_clock = None self.events = defaultdict(list) @@ -31,13 +37,12 @@ class BitfinexTestCase(TestCase): def test_clock(self): with patch('catalyst.exchange.exchange_clock.pd.to_datetime') as to_dt, \ patch('catalyst.exchange.exchange_clock.sleep') as sleep: - clock = ExchangeClock() + clock = ExchangeClock(sessions=self.sessions) to_dt.side_effect = self.get_clock sleep.side_effect = self.advance_clock start_time = pd.Timestamp.utcnow() self.internal_clock = start_time - log.info('listing events') events = list(clock) # Event 0 is SESSION_START which always happens at 00:00.