Trying to run a live algo

This commit is contained in:
Frederic Fortier
2017-08-15 01:37:35 -04:00
parent 270175b6c4
commit fade9758a8
6 changed files with 629 additions and 21 deletions
+28
View File
@@ -0,0 +1,28 @@
# code
from catalyst.api import order, record, symbol
from catalyst.exchange.algorithm_exchange import ExchangeTradingAlgorithm
from datetime import timedelta
from catalyst.exchange.bitfinex import Bitfinex
import pandas as pd
bitfinex = Bitfinex()
def initialize(context):
pass
def handle_data(context, data):
asset = bitfinex.get_asset('eth_usd')
test = data.current(asset, 'close')
order(symbol('AAPL'), 10)
algo_obj = ExchangeTradingAlgorithm(
initialize=initialize,
handle_data=handle_data,
start=pd.Timestamp.utcnow(),
end=pd.Timestamp.utcnow() + timedelta(hours=1),
exchange=bitfinex,
)
perf_manual = algo_obj.run()
+137
View File
@@ -0,0 +1,137 @@
# code
import os
import re
from catalyst.api import order, record, symbol
from catalyst.exchange.algorithm_exchange import ExchangeTradingAlgorithm
from datetime import timedelta
from catalyst.exchange.bitfinex import Bitfinex
import pandas as pd
from catalyst.api import (
order_target_value,
symbol,
record,
cancel_order,
get_open_orders,
)
from catalyst.algorithm import TradingAlgorithm
from catalyst.data.bundles.core import load
from catalyst.data.data_portal import DataPortal
from catalyst.data.loader import load_crypto_market_data
from catalyst.finance.trading import TradingEnvironment
from catalyst.pipeline.data import USEquityPricing, CryptoPricing
from catalyst.pipeline.loaders import (
USEquityPricingLoader,
CryptoPricingLoader,
)
from catalyst.utils.calendars import get_calendar
from functools import partial
bitfinex = Bitfinex()
def initialize(context):
context.ASSET_NAME = 'USDT_BTC'
context.TARGET_HODL_RATIO = 0.8
context.RESERVE_RATIO = 1.0 - context.TARGET_HODL_RATIO
# For all trading pairs in the poloniex bundle, the default denomination
# currently supported by Catalyst is 1/1000th of a full coin. Use this
# constant to scale the price of up to that of a full coin if desired.
context.TICK_SIZE = 1000.0
context.is_buying = True
context.asset = symbol(context.ASSET_NAME)
context.i = 0
pass
def handle_data(context, data):
context.i += 1
print 'i:', context.i
starting_cash = context.portfolio.starting_cash
target_hodl_value = context.TARGET_HODL_RATIO * starting_cash
reserve_value = context.RESERVE_RATIO * starting_cash
# Cancel any outstanding orders
orders = get_open_orders(context.asset) or []
for order in orders:
cancel_order(order)
# Stop buying after passing the reserve threshold
cash = context.portfolio.cash
if cash <= reserve_value:
context.is_buying = False
# Retrieve current asset price from pricing data
price = data[context.asset].price
# Check if still buying and could (approximately) afford another purchase
if context.is_buying and cash > price:
# Place order to make position in asset equal to target_hodl_value
order_target_value(
context.asset,
target_hodl_value,
limit_price=price * 1.1,
stop_price=price * 0.9,
)
record(
price=price,
cash=cash,
starting_cash=context.portfolio.starting_cash,
leverage=context.account.leverage,
)
b = 'poloniex'
bundle_data = load(
b,
os.environ,
pd.Timestamp.utcnow() - timedelta(days=10),
)
prefix, connstr = re.split(
r'sqlite:///',
str(bundle_data.asset_finder.engine.url),
maxsplit=1,
)
if prefix:
raise ValueError(
"invalid url %r, must begin with 'sqlite:///'" %
str(bundle_data.asset_finder.engine.url),
)
open_calendar = get_calendar('OPEN')
env = TradingEnvironment(
load=partial(load_crypto_market_data, environ=os.environ),
bm_symbol='USDT_BTC',
trading_calendar=open_calendar,
asset_db_path=connstr,
environ=os.environ,
)
first_trading_day = pd.Timestamp.utcnow() - timedelta(days=10)
data = DataPortal(
env.asset_finder,
open_calendar,
first_trading_day=first_trading_day,
minute_reader=bundle_data.minute_bar_reader,
five_minute_reader=bundle_data.five_minute_bar_reader,
daily_reader=bundle_data.daily_bar_reader,
adjustment_reader=bundle_data.adjustment_reader,
)
algo_obj = ExchangeTradingAlgorithm(
initialize=initialize,
handle_data=handle_data,
start=first_trading_day,
end=pd.Timestamp.utcnow() - timedelta(days=1),
exchange=bitfinex,
)
perf_manual = algo_obj.run(data, overwrite_sim_params=False)
+3 -1
View File
@@ -46,6 +46,7 @@ class ExchangeClock(object):
before_trading_start_minutes=None,
minute_emission=False,
time_skew=pd.Timedelta("0s")):
self.sessions = sessions
self.execution_opens = execution_opens
self.execution_closes = execution_closes
@@ -53,7 +54,7 @@ class ExchangeClock(object):
self.minute_emission = minute_emission
self.time_skew = time_skew
self._last_emit = None
self._before_trading_start_bar_yielded = False
self._before_trading_start_bar_yielded = True
def __iter__(self):
yield pd.Timestamp.utcnow(), SESSION_START
@@ -64,6 +65,7 @@ class ExchangeClock(object):
if self._last_emit is None or server_time > self._last_emit:
print 'emitting bar %s' % server_time
self._last_emit = server_time
yield server_time, BAR
+37 -16
View File
@@ -4,6 +4,10 @@ from runpy import run_path
import sys
import warnings
import pandas as pd
import click
try:
from pygments import highlight
@@ -29,6 +33,9 @@ from catalyst.utils.calendars import get_calendar
from catalyst.utils.factory import create_simulation_parameters
import catalyst.utils.paths as pth
from catalyst.exchange.algorithm_exchange import ExchangeTradingAlgorithm
from catalyst.exchange.data_portal_exchange import DataPortalExchange
class _RunAlgoError(click.ClickException, ValueError):
"""Signal an error that should have a different message if invoked from
@@ -68,7 +75,8 @@ def _run(handle_data,
output,
print_algo,
local_namespace,
environ):
environ,
exchange):
"""Run a backtest for the given algorithm.
This is shared between the cli and :func:`catalyst.run_algo`.
@@ -87,7 +95,7 @@ def _run(handle_data,
raise ValueError(
'invalid define %r, should be of the form name=value' %
assign,
)
)
try:
# evaluate in the same namespace so names may refer to
# eachother
@@ -95,7 +103,7 @@ def _run(handle_data,
except Exception as e:
raise ValueError(
'failed to execute definition for name %r: %s' % (name, e),
)
)
elif defines:
raise _RunAlgoError(
'cannot pass define without `algotext`',
@@ -144,7 +152,7 @@ def _run(handle_data,
raise ValueError(
"invalid url %r, must begin with 'sqlite:///'" %
str(bundle_data.asset_finder.engine.url),
)
)
open_calendar = get_calendar('OPEN')
@@ -158,7 +166,10 @@ def _run(handle_data,
first_trading_day = bundle_data.minute_bar_reader.first_trading_day
data = DataPortal(
DataPortalClass = (partial(DataPortalExchange, exchange)
if exchange
else DataPortal)
data = DataPortalClass(
env.asset_finder,
open_calendar,
first_trading_day=first_trading_day,
@@ -179,16 +190,16 @@ def _run(handle_data,
if b == 'poloniex':
return CryptoPricingLoader(
bundle_data,
data_frequency,
CryptoPricing,
)
bundle_data,
data_frequency,
CryptoPricing,
)
elif b == 'quandl':
return USEquityPricingLoader(
bundle_data,
data_frequency,
USEquityPricing,
)
bundle_data,
data_frequency,
USEquityPricing,
)
raise ValueError(
"No PipelineLoader registered for bundle %s." % b
)
@@ -208,7 +219,14 @@ def _run(handle_data,
env = TradingEnvironment(environ=environ)
choose_loader = None
perf = TradingAlgorithm(
if exchange:
start = pd.Timestamp.utcnow()
end = start + pd.Timedelta('1', 'D')
TradingAlgorithmClass = (partial(ExchangeTradingAlgorithm, exchange=exchange)
if exchange else TradingAlgorithm)
perf = TradingAlgorithmClass(
namespace=namespace,
env=env,
get_pipeline_loader=choose_loader,
@@ -308,7 +326,9 @@ def run_algorithm(start,
default_extension=True,
extensions=(),
strict_extensions=True,
environ=os.environ):
environ=os.environ,
live_trading=False,
tws_uri=None):
"""Run a trading algorithm.
Parameters
@@ -386,7 +406,7 @@ def run_algorithm(start,
raise ValueError(
'must specify one of `data`, `data_portal`, or `bundle`,'
' got: %r' % non_none_data,
)
)
elif 'bundle' not in non_none_data and bundle_timestamp is not None:
raise ValueError(
@@ -412,4 +432,5 @@ def run_algorithm(start,
print_algo=False,
local_namespace=False,
environ=environ,
exchange=None,
)
+415
View File
@@ -0,0 +1,415 @@
import os
import re
from runpy import run_path
import sys
import warnings
import click
try:
from pygments import highlight
from pygments.lexers import PythonLexer
from pygments.formatters import TerminalFormatter
PYGMENTS = True
except:
PYGMENTS = False
from toolz import valfilter, concatv
from functools import partial
from catalyst.algorithm import TradingAlgorithm
from catalyst.data.bundles.core import load
from catalyst.data.data_portal import DataPortal
from catalyst.data.loader import load_crypto_market_data
from catalyst.finance.trading import TradingEnvironment
from catalyst.pipeline.data import USEquityPricing, CryptoPricing
from catalyst.pipeline.loaders import (
USEquityPricingLoader,
CryptoPricingLoader,
)
from catalyst.utils.calendars import get_calendar
from catalyst.utils.factory import create_simulation_parameters
import catalyst.utils.paths as pth
class _RunAlgoError(click.ClickException, ValueError):
"""Signal an error that should have a different message if invoked from
the cli.
Parameters
----------
pyfunc_msg : str
The message that will be shown when called as a python function.
cmdline_msg : str
The message that will be shown on the command line.
"""
exit_code = 1
def __init__(self, pyfunc_msg, cmdline_msg):
super(_RunAlgoError, self).__init__(cmdline_msg)
self.pyfunc_msg = pyfunc_msg
def __str__(self):
return self.pyfunc_msg
def _run(handle_data,
initialize,
before_trading_start,
analyze,
algofile,
algotext,
defines,
data_frequency,
capital_base,
data,
bundle,
bundle_timestamp,
start,
end,
output,
print_algo,
local_namespace,
environ):
"""Run a backtest for the given algorithm.
This is shared between the cli and :func:`catalyst.run_algo`.
"""
if algotext is not None:
if local_namespace:
ip = get_ipython() # noqa
namespace = ip.user_ns
else:
namespace = {}
for assign in defines:
try:
name, value = assign.split('=', 2)
except ValueError:
raise ValueError(
'invalid define %r, should be of the form name=value' %
assign,
)
try:
# evaluate in the same namespace so names may refer to
# eachother
namespace[name] = eval(value, namespace)
except Exception as e:
raise ValueError(
'failed to execute definition for name %r: %s' % (name, e),
)
elif defines:
raise _RunAlgoError(
'cannot pass define without `algotext`',
"cannot pass '-D' / '--define' without '-t' / '--algotext'",
)
else:
namespace = {}
if algofile is not None:
algotext = algofile.read()
if print_algo:
if PYGMENTS:
highlight(
algotext,
PythonLexer(),
TerminalFormatter(),
outfile=sys.stdout,
)
else:
click.echo(algotext)
if bundle is not None:
bundles = bundle.split(',')
def get_trading_env_and_data(bundles):
env = data = None
b = 'poloniex'
if len(bundles) == 0:
return env, data
elif len(bundles) == 1:
b = bundles[0]
bundle_data = load(
b,
environ,
bundle_timestamp,
)
prefix, connstr = re.split(
r'sqlite:///',
str(bundle_data.asset_finder.engine.url),
maxsplit=1,
)
if prefix:
raise ValueError(
"invalid url %r, must begin with 'sqlite:///'" %
str(bundle_data.asset_finder.engine.url),
)
open_calendar = get_calendar('OPEN')
env = TradingEnvironment(
load=partial(load_crypto_market_data, environ=environ),
bm_symbol='USDT_BTC',
trading_calendar=open_calendar,
asset_db_path=connstr,
environ=environ,
)
first_trading_day = bundle_data.minute_bar_reader.first_trading_day
data = DataPortal(
env.asset_finder,
open_calendar,
first_trading_day=first_trading_day,
minute_reader=bundle_data.minute_bar_reader,
five_minute_reader=bundle_data.five_minute_bar_reader,
daily_reader=bundle_data.daily_bar_reader,
adjustment_reader=bundle_data.adjustment_reader,
)
return env, data
def get_loader_for_bundle(b):
bundle_data = load(
b,
environ,
bundle_timestamp,
)
if b == 'poloniex':
return CryptoPricingLoader(
bundle_data,
data_frequency,
CryptoPricing,
)
elif b == 'quandl':
return USEquityPricingLoader(
bundle_data,
data_frequency,
USEquityPricing,
)
raise ValueError(
"No PipelineLoader registered for bundle %s." % b
)
loaders = [get_loader_for_bundle(b) for b in bundles]
env, data = get_trading_env_and_data(bundles)
def choose_loader(column):
for loader in loaders:
if column in loader.columns:
return loader
raise ValueError(
"No PipelineLoader registered for column %s." % column
)
else:
env = TradingEnvironment(environ=environ)
choose_loader = None
perf = TradingAlgorithm(
namespace=namespace,
env=env,
get_pipeline_loader=choose_loader,
sim_params=create_simulation_parameters(
start=start,
end=end,
capital_base=capital_base,
data_frequency=data_frequency,
emission_rate=data_frequency,
),
**{
'initialize': initialize,
'handle_data': handle_data,
'before_trading_start': before_trading_start,
'analyze': analyze,
} if algotext is None else {
'algo_filename': getattr(algofile, 'name', '<algorithm>'),
'script': algotext,
}
).run(
data,
overwrite_sim_params=False,
)
if output == '-':
click.echo(str(perf))
elif output != os.devnull: # make the catalyst magic not write any data
perf.to_pickle(output)
return perf
# All of the loaded extensions. We don't want to load an extension twice.
_loaded_extensions = set()
def load_extensions(default, extensions, strict, environ, reload=False):
"""Load all of the given extensions. This should be called by run_algo
or the cli.
Parameters
----------
default : bool
Load the default exension (~/.catalyst/extension.py)?
extension : iterable[str]
The paths to the extensions to load. If the path ends in ``.py`` it is
treated as a script and executed. If it does not end in ``.py`` it is
treated as a module to be imported.
strict : bool
Should failure to load an extension raise. If this is false it will
still warn.
environ : mapping
The environment to use to find the default extension path.
reload : bool, optional
Reload any extensions that have already been loaded.
"""
if default:
default_extension_path = pth.default_extension(environ=environ)
pth.ensure_file(default_extension_path)
# put the default extension first so other extensions can depend on
# the order they are loaded
extensions = concatv([default_extension_path], extensions)
for ext in extensions:
if ext in _loaded_extensions and not reload:
continue
try:
# load all of the catalyst extensionss
if ext.endswith('.py'):
run_path(ext, run_name='<extension>')
else:
__import__(ext)
except Exception as e:
if strict:
# if `strict` we should raise the actual exception and fail
raise
# without `strict` we should just log the failure
warnings.warn(
'Failed to load extension: %r\n%s' % (ext, e),
stacklevel=2
)
else:
_loaded_extensions.add(ext)
def run_algorithm(start,
end,
initialize,
capital_base,
handle_data=None,
before_trading_start=None,
analyze=None,
data_frequency='daily',
data=None,
bundle=None,
bundle_timestamp=None,
default_extension=True,
extensions=(),
strict_extensions=True,
environ=os.environ):
"""Run a trading algorithm.
Parameters
----------
start : datetime
The start date of the backtest.
end : datetime
The end date of the backtest..
initialize : callable[context -> None]
The initialize function to use for the algorithm. This is called once
at the very begining of the backtest and should be used to set up
any state needed by the algorithm.
capital_base : float
The starting capital for the backtest.
handle_data : callable[(context, BarData) -> None], optional
The handle_data function to use for the algorithm. This is called
every minute when ``data_frequency == 'minute'`` or every day
when ``data_frequency == 'daily'``.
before_trading_start : callable[(context, BarData) -> None], optional
The before_trading_start function for the algorithm. This is called
once before each trading day (after initialize on the first day).
analyze : callable[(context, pd.DataFrame) -> None], optional
The analyze function to use for the algorithm. This function is called
once at the end of the backtest and is passed the context and the
performance data.
data_frequency : {'daily', 'minute'}, optional
The data frequency to run the algorithm at.
data : pd.DataFrame, pd.Panel, or DataPortal, optional
The ohlcv data to run the backtest with.
This argument is mutually exclusive with:
``bundle``
``bundle_timestamp``
bundle : str, optional
The name of the data bundle to use to load the data to run the backtest
with. This defaults to 'quantopian-quandl'.
This argument is mutually exclusive with ``data``.
bundle_timestamp : datetime, optional
The datetime to lookup the bundle data for. This defaults to the
current time.
This argument is mutually exclusive with ``data``.
default_extension : bool, optional
Should the default catalyst extension be loaded. This is found at
``$ZIPLINE_ROOT/extension.py``
extensions : iterable[str], optional
The names of any other extensions to load. Each element may either be
a dotted module path like ``a.b.c`` or a path to a python file ending
in ``.py`` like ``a/b/c.py``.
strict_extensions : bool, optional
Should the run fail if any extensions fail to load. If this is false,
a warning will be raised instead.
environ : mapping[str -> str], optional
The os environment to use. Many extensions use this to get parameters.
This defaults to ``os.environ``.
Returns
-------
perf : pd.DataFrame
The daily performance of the algorithm.
See Also
--------
catalyst.data.bundles.bundles : The available data bundles.
"""
load_extensions(default_extension, extensions, strict_extensions, environ)
non_none_data = valfilter(bool, {
'data': data is not None,
'bundle': bundle is not None,
})
if not non_none_data:
# if neither data nor bundle are passed use 'quantopian-quandl'
bundle = 'quantopian-quandl'
elif len(non_none_data) != 1:
raise ValueError(
'must specify one of `data`, `data_portal`, or `bundle`,'
' got: %r' % non_none_data,
)
elif 'bundle' not in non_none_data and bundle_timestamp is not None:
raise ValueError(
'cannot specify `bundle_timestamp` without passing `bundle`',
)
return _run(
handle_data=handle_data,
initialize=initialize,
before_trading_start=before_trading_start,
analyze=analyze,
algofile=None,
algotext=None,
defines=(),
data_frequency=data_frequency,
capital_base=capital_base,
data=data,
bundle=bundle,
bundle_timestamp=bundle_timestamp,
start=start,
end=end,
output=os.devnull,
print_algo=False,
local_namespace=False,
environ=environ,
)
+9 -4
View File
@@ -8,10 +8,16 @@ from collections import defaultdict
from catalyst.utils.calendars import get_calendar
import pandas as pd
log = Logger('BitfinexTestCase')
log = Logger('ExchangeClockTestCase')
class BitfinexTestCase(TestCase):
class ExchangeClockTestCase(TestCase):
@classmethod
def setUpClass(cls):
cls.open_calendar = get_calendar("OPEN")
cls.sessions = pd.Timestamp.utcnow()
def setUp(self):
self.internal_clock = None
self.events = defaultdict(list)
@@ -31,13 +37,12 @@ class BitfinexTestCase(TestCase):
def test_clock(self):
with patch('catalyst.exchange.exchange_clock.pd.to_datetime') as to_dt, \
patch('catalyst.exchange.exchange_clock.sleep') as sleep:
clock = ExchangeClock()
clock = ExchangeClock(sessions=self.sessions)
to_dt.side_effect = self.get_clock
sleep.side_effect = self.advance_clock
start_time = pd.Timestamp.utcnow()
self.internal_clock = start_time
log.info('listing events')
events = list(clock)
# Event 0 is SESSION_START which always happens at 00:00.