diff --git a/catalyst/__main__.py b/catalyst/__main__.py index c6322f37..f57a52b4 100644 --- a/catalyst/__main__.py +++ b/catalyst/__main__.py @@ -8,6 +8,7 @@ import pandas as pd from six import text_type from catalyst.data import bundles as bundles_module +from catalyst.exchange.exchange_bundle import ExchangeBundle from catalyst.utils.cli import Date, Timestamp from catalyst.utils.run_algo import _run, load_extensions @@ -39,7 +40,6 @@ except NameError: help="Don't load the default catalyst extension.py file in $ZIPLINE_HOME.", ) @click.version_option() - def main(extension, strict_extensions, default_extension): """Top level catalyst entry point. """ @@ -238,7 +238,7 @@ def run(ctx, # does not pass either of these and then passes the first only # to be told they need to pass the second argument also ctx.fail( - "must specify dates with '-s' / '--start' and '-e' / '--end'", + "must specify dates with '-s' / '--start' and '-e' / '--end'", ) if start is None: ctx.fail("must specify a start date with '-s' / '--start'") @@ -246,7 +246,7 @@ def run(ctx, ctx.fail("must specify an end date with '-e' / '--end'") if exchange_name is None: - ctx.fail("must specify an exchange name '-x'") + ctx.fail("must specify an exchange name '-x'") perf = _run( initialize=None, @@ -307,13 +307,14 @@ def catalyst_magic(line, cell=None): '%s%%catalyst' % ((cell or '') and '%'), # don't use system exit and propogate errors to the caller standalone_mode=False, - ) + ) except SystemExit as e: # https://github.com/mitsuhiko/click/pull/533 # even in standalone_mode=False `--help` really wants to kill us ;_; if e.code: raise ValueError('main returned non-zero status code: %d' % e.code) + @main.command() @click.option( '-f', @@ -380,33 +381,32 @@ def catalyst_magic(line, cell=None): default=False, help='Display live graph.', ) - @click.pass_context def live(ctx, - algofile, - algotext, - define, - output, - print_algo, - local_namespace, - exchange_name, - algo_namespace, - base_currency, - live_graph): + algofile, + algotext, + define, + output, + print_algo, + local_namespace, + exchange_name, + algo_namespace, + base_currency, + live_graph): """Trade live with the given algorithm. """ if (algotext is not None) == (algofile is not None): ctx.fail( "must specify exactly one of '-f' / '--algofile' or" " '-t' / '--algotext'", - ) + ) if exchange_name is None: - ctx.fail("must specify an exchange name '-x'") + ctx.fail("must specify an exchange name '-x'") if algo_namespace is None: - ctx.fail("must specify an algorithm name '-n' in live execution mode") + ctx.fail("must specify an algorithm name '-n' in live execution mode") if base_currency is None: - ctx.fail("must specify a base currency '-c' in live execution mode") + ctx.fail("must specify a base currency '-c' in live execution mode") perf = _run( initialize=None, @@ -442,15 +442,74 @@ def live(ctx, return perf +@main.command() +@click.option( + '-x', + '--exchange-name', + type=click.Choice({'bitfinex', 'bittrex', 'poloniex'}), + help='The name of the exchange bundle to ingest (supported: bitfinex,' + ' bittrex, poloniex).', +) +@click.option( + '--data-frequency', + type=click.Choice({'daily', 'minute', 'daily,minute'}), + default='daily', + show_default=True, + help='The data frequency of the desired OHLCV bars.', +) +@click.option( + '-s', + '--start', + default=None, + type=Date(tz='utc', as_timestamp=True), + help='The start date of the data range. (default: one year from end date)', +) +@click.option( + '-e', + '--end', + default=None, + type=Date(tz='utc', as_timestamp=True), + help='The end date of the data range. (default: today)', +) +@click.option( + '--show-progress/--no-show-progress', + default=True, + help='Print progress information to the terminal.' +) +def ingest_exchange(exchange_name, data_frequency, start, end, + show_progress): + """ + Ingest data for the given exchange. + """ + click.echo('ingesting exchange bundle {}'.format(exchange_name)) + exchange_bundle = ExchangeBundle( + exchange_name=exchange_name, + data_frequency=data_frequency, + include_symbols=None, + exclude_symbols=None, + start=start, + end=end, + show_progress=show_progress + ) + exchange_bundle.ingest() + + @main.command() @click.option( '-b', '--bundle', - default='poloniex', metavar='BUNDLE-NAME', - show_default=True, + default=None, + show_default=False, help='The data bundle to ingest.', ) +@click.option( + '-x', + '--exchange-name', + type=click.Choice({'bitfinex', 'bittrex', 'poloniex'}), + help='The name of the exchange bundle to ingest (supported: bitfinex,' + ' bittrex, poloniex).', +) @click.option( '-c', '--compile-locally', @@ -469,9 +528,12 @@ def live(ctx, default=True, help='Print progress information to the terminal.' ) -def ingest(bundle, compile_locally, assets_version, show_progress): +@click.pass_context +def ingest(ctx, bundle, exchange_name, compile_locally, assets_version, + show_progress): """Ingest the data for the given bundle. """ + bundles_module.ingest( bundle, os.environ, @@ -491,6 +553,13 @@ def ingest(bundle, compile_locally, assets_version, show_progress): show_default=True, help='The data bundle to clean.', ) +@click.option( + '-x', + '--exchange_name', + metavar='EXCHANGE-NAME', + show_default=True, + help='The exchange bundle name to clean.', +) @click.option( '-e', '--before', @@ -523,6 +592,7 @@ def clean(bundle, before, after, keep_last): keep_last, ) + @main.command() def bundles(): """List all of the available data bundles. diff --git a/catalyst/data/loader.py b/catalyst/data/loader.py index a9dd8167..48fd2b13 100644 --- a/catalyst/data/loader.py +++ b/catalyst/data/loader.py @@ -12,30 +12,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime import os from collections import OrderedDict import logbook import pandas as pd -import numpy as np -from pandas_datareader.data import DataReader -import datetime -import time import pytz +from pandas_datareader.data import DataReader from six import iteritems from six.moves.urllib_error import HTTPError -from .benchmarks import get_benchmark_returns +from catalyst.utils.calendars import get_calendar from . import treasuries, treasuries_can +from .benchmarks import get_benchmark_returns +from ..utils.deprecate import deprecated from ..utils.paths import ( cache_root, data_root, ) -from ..utils.deprecate import deprecated - -from catalyst.data.bundles.poloniex import PoloniexBundle -from catalyst.utils.calendars import get_calendar - logger = logbook.Logger('Loader') @@ -308,20 +303,21 @@ def ensure_crypto_benchmark_data(symbol, ('Downloading benchmark data for {symbol!r} from {first_date} to {last_date}'), symbol=symbol, first_date=first_date, last_date=last_date) + raise DeprecationWarning('poloniex bundle deprecated') # Load benchmark symbol from Poloniex API - try: - bundle = PoloniexBundle() - bench_raw = bundle._fetch_symbol_frame( - None, - symbol, - get_calendar(bundle.calendar_name), - first_date - trading_day, - last_date, - 'daily', - ) - except (OSError, IOError, HTTPError): - logger.exception('Failed to fetch new crypto benchmark returns') - raise + # try: + # bundle = PoloniexBundle() + # bench_raw = bundle._fetch_symbol_frame( + # None, + # symbol, + # get_calendar(bundle.calendar_name), + # first_date - trading_day, + # last_date, + # 'daily', + # ) + # except (OSError, IOError, HTTPError): + # logger.exception('Failed to fetch new crypto benchmark returns') + # raise # select close column and compute percent change between days daily_close = bench_raw[['close']] diff --git a/catalyst/examples/buy_low_sell_high_neo_with_interface.py b/catalyst/examples/buy_low_sell_high_neo_with_interface.py index 4a9cc4c5..2e216cb5 100644 --- a/catalyst/examples/buy_low_sell_high_neo_with_interface.py +++ b/catalyst/examples/buy_low_sell_high_neo_with_interface.py @@ -148,27 +148,27 @@ def analyze(context, stats): pass -run_algorithm( - initialize=initialize, - handle_data=handle_data, - analyze=analyze, - exchange_name='bitfinex', - live=True, - algo_namespace=algo_namespace, - base_currency='btc', - live_graph=False -) - -# Backtest # run_algorithm( -# capital_base=250, -# start=pd.to_datetime('2017-09-08', utc=True), -# end=pd.to_datetime('2017-09-15', utc=True), -# data_frequency='minute', # initialize=initialize, # handle_data=handle_data, # analyze=analyze, # exchange_name='bitfinex', +# live=True, # algo_namespace=algo_namespace, -# base_currency='btc' +# base_currency='btc', +# live_graph=False # ) + +# Backtest +run_algorithm( + capital_base=250, + start=pd.to_datetime('2017-09-08', utc=True), + end=pd.to_datetime('2017-09-15', utc=True), + data_frequency='minute', + initialize=initialize, + handle_data=handle_data, + analyze=analyze, + exchange_name='bitfinex', + algo_namespace=algo_namespace, + base_currency='btc' +) diff --git a/catalyst/exchange/data_api.py b/catalyst/exchange/bundle_utils.py similarity index 74% rename from catalyst/exchange/data_api.py rename to catalyst/exchange/bundle_utils.py index 717dbe82..eca43e6c 100644 --- a/catalyst/exchange/data_api.py +++ b/catalyst/exchange/bundle_utils.py @@ -1,4 +1,16 @@ import datetime +from logging import Logger, DEBUG +import os +from dateutil.relativedelta import relativedelta +import pandas as pd + +from catalyst import get_calendar +from catalyst.data.minute_bars import BcolzMinuteBarWriter +from catalyst.data.us_equity_pricing import BcolzDailyBarWriter +from catalyst.exchange.exchange_utils import get_exchange_folder +from catalyst.utils.paths import data_root, ensure_directory + +log = Logger('test_exchange_bundle') def get_date_from_ms(ms): @@ -30,7 +42,7 @@ def get_history_mock(exchange_name, data_frequency, symbol, start_ms, end_ms, Notes ===== - Using milliseconds for the start and end dates for ease of use in the + Using milliseconds for the start and end dates for ease of use in URL query parameters. Sometimes, one minute goes by without completing a trade of the given @@ -71,3 +83,15 @@ def get_history_mock(exchange_name, data_frequency, symbol, start_ms, end_ms, last_traded=candle['last_traded'] )) return ohlcv + + +def fetch_candles_chunk(exchange, assets, data_frequency, end_dt, bar_count): + calc_start_dt = end_dt - datetime.timedelta(minutes=bar_count) + candles = exchange.get_candles( + data_frequency=data_frequency, + assets=assets, + bar_count=bar_count, + start_dt=calc_start_dt, + end_dt=end_dt + ) + return candles diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index 0c4d7163..9d1afe45 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -1,5 +1,4 @@ import abc -import collections import random from abc import ABCMeta, abstractmethod, abstractproperty from datetime import timedelta diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 9c3cccc1..668b3e57 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -1,16 +1,19 @@ +import os from datetime import timedelta import pandas as pd -import numpy as np -from logbook import Logger, INFO +from logbook import Logger from catalyst import get_calendar -from catalyst.data.minute_bars import BcolzMinuteOverlappingData -from catalyst.exchange.bitfinex.bitfinex import Bitfinex -from catalyst.exchange.bittrex.bittrex import Bittrex -from catalyst.exchange.exchange_errors import ExchangeNotFoundError -from catalyst.exchange.exchange_utils import get_exchange_auth +from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \ + BcolzMinuteBarWriter, BcolzMinuteBarReader +from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \ + BcolzDailyBarReader +from catalyst.exchange.bundle_utils import fetch_candles_chunk +from catalyst.exchange.exchange_utils import get_exchange_folder +from catalyst.exchange.init_utils import get_exchange from catalyst.utils.cli import maybe_show_progress +from catalyst.utils.paths import ensure_directory def _cachpath(symbol, type_): @@ -20,251 +23,33 @@ def _cachpath(symbol, type_): log = Logger('exchange_bundle') -def fetch_candles_chunk(exchange, assets, data_frequency, end_dt, bar_count): - calc_start_dt = end_dt - timedelta(minutes=bar_count) - candles = exchange.get_candles( - data_frequency=data_frequency, - assets=assets, - bar_count=bar_count, - start_dt=calc_start_dt, - end_dt=end_dt - ) - return candles +class ExchangeBundle: + def __init__(self, exchange_name, data_frequency, include_symbols=None, + exclude_symbols=None, start=None, end=None, + show_progress=True, environ=os.environ): + self.exchange = get_exchange(exchange_name) + self.data_frequency = data_frequency + self.assets = self.get_assets(include_symbols, exclude_symbols) + self.start, self.end = self.get_adj_dates(start, end) + self.environ = environ + self.show_progress = show_progress + self.minutes_per_day = 1440 + self.default_ohlc_ratio = 1000000 + self._writer = None + self._reader = None + def get_assets(self, include_symbols, exclude_symbols): + # TODO: filter assets + return self.exchange.assets -def process_bar_data(exchange, assets, writer, data_frequency, - show_progress, start, end): - open_calendar = get_calendar('OPEN') - - writer.default_ohlc_ratio = 1000000 - writer.calendar = open_calendar - writer.minutes_per_day = 1440 - writer.write_metadata = True - - delta = end - start - if data_frequency == 'minute': - delta_periods = delta.total_seconds() / 60 - frequency = '1m' - - elif data_frequency == 'daily': - delta_periods = delta.total_seconds() / 60 / 60 / 24 - frequency = '1d' - - else: - raise ValueError('frequency not supported') - - if delta_periods > exchange.num_candles_limit: - bar_count = exchange.num_candles_limit - - chunks = [] - last_chunk_date = end.floor('1 min') - while last_chunk_date > start + timedelta(minutes=bar_count): - # TODO: account for the partial last bar - chunk = dict(end=last_chunk_date, bar_count=bar_count) - chunks.append(chunk) - - # TODO: base on frequency - last_chunk_date = \ - last_chunk_date - timedelta(minutes=(bar_count + 1)) - - chunks.reverse() - - else: - chunks = [dict(end=end, bar_count=delta_periods)] - - with maybe_show_progress( - chunks, - show_progress, - label='Fetching {exchange} {frequency} candles: '.format( - exchange=exchange.name, - frequency=data_frequency - )) as it: - - previous_candle = dict() - for chunk in it: - chunk_end = chunk['end'] - chunk_start = chunk_end - timedelta(minutes=chunk['bar_count']) - - chunk_assets = [] - for asset in assets: - if asset.start_date <= chunk_end: - chunk_assets.append(asset) - - # TODO: ensure correct behavior for assets starting in the chunk - candles = fetch_candles_chunk( - exchange=exchange, - assets=chunk_assets, - data_frequency=frequency, - end_dt=chunk_end, - bar_count=chunk['bar_count'] - ) - log.debug('requests counter {}'.format(exchange.request_cpt)) - - num_candles = 0 - data = [] - for asset in candles: - asset_candles = candles[asset] - if not asset_candles: - log.debug( - 'no data: {symbols} on {exchange}, date {end}'.format( - symbols=chunk_assets, - exchange=exchange.name, - end=chunk_end - ) - ) - continue - - all_dates = [] - all_candles = [] - date = chunk_start - while date <= chunk_end: - - previous = previous_candle[asset] \ - if asset in previous_candle else None - - candle = next((candle for candle in asset_candles \ - if candle['last_traded'] == date), previous) - - if candle is not None: - all_dates.append(date) - all_candles.append(candle) - - previous_candle[asset] = candle - - date += timedelta(minutes=1) - - df = pd.DataFrame(all_candles, index=all_dates) - if not df.empty: - df.sort_index(inplace=True) - - sid = asset.sid - num_candles += len(df.values) - - data.append((sid, df)) - - try: - log.debug( - 'writing {num_candles} candles from {start} to {end}'.format( - num_candles=num_candles, - start=chunk_start, - end=chunk_end - ) - ) - - for pair in data: - log.debug('data for sid {}\n{}\n{}'.format( - pair[0], pair[1].head(2), pair[1].tail(2))) - - writer.write( - data=data, - show_progress=False, - invalid_data_behavior='raise' - ) - except BcolzMinuteOverlappingData as e: - log.warn('chunk already exists {}: {}'.format(chunk, e)) - - -def exchange_bundle(exchange_name, symbols=None, start=None, end=None, - log_level=INFO): - """Create a data bundle ingest function for the specified exchange. - - Parameters - ---------- - exchange_name: str - The name of the exchange - symbols : iterable[str] - The ticker symbols to load data for. - start : datetime, optional - The start date to query for. By default this pulls the full history - for the calendar. - end : datetime, optional - The end date to query for. By default this pulls the full history - for the calendar. - - Returns - ------- - ingest : callable - The bundle ingest function for the given set of symbols. - - Examples - -------- - This code should be added to ~/.catalyst/extension.py - - .. code-block:: python - - from catalyst.data.bundles import register - from catalyst.exchange.exchange_bundle import exchange_bundle - - symbols = ( - 'btc_usd', - 'eth_btc', - 'etc_btc', - 'neo_btc', - ) - register('exchange_bitfinex', exchange_bundle('bitfinex', symbols)) - - Notes - ----- - The sids for each symbol will be the index into the symbols sequence. - """ - # strict this in memory so that we can reiterate over it - log.level = log_level - - def ingest(environ, - asset_db_writer, - minute_bar_writer, - daily_bar_writer, - adjustment_writer, - calendar, - start_session, - end_session, - cache, - show_progress, - is_compile, - output_dir, - start=start, - end=end): - - log.info('ingesting bundle {}'.format(output_dir)) - - # TODO: I don't understand this session vs dates idea - if start is None: - start = start_session - if end is None: - end = end_session - + def get_adj_dates(self, start, end): now = pd.Timestamp.utcnow() if end > now: log.info('adjusting the end date to now {}'.format(now)) end = now - log.info('ingesting data from {} to {}'.format(start, end)) - - exchange_auth = get_exchange_auth(exchange_name) - if exchange_name == 'bitfinex': - exchange = Bitfinex( - key=exchange_auth['key'], - secret=exchange_auth['secret'], - base_currency=None, # TODO: make optional at the exchange - portfolio=None - ) - elif exchange_name == 'bittrex': - exchange = Bittrex( - key=exchange_auth['key'], - secret=exchange_auth['secret'], - base_currency=None, - portfolio=None - ) - else: - raise ExchangeNotFoundError(exchange_name=exchange_name) - - if symbols is not None: - assets = exchange.get_assets(symbols) - else: - assets = exchange.assets - earliest_trade = None - for asset in assets: + for asset in self.assets: if earliest_trade is None or earliest_trade > asset.start_date: earliest_trade = asset.start_date @@ -278,26 +63,213 @@ def exchange_bundle(exchange_name, symbols=None, start=None, end=None, if start >= end: raise ValueError('start date cannot be after end date') - # if daily_bar_writer is not None: - # process_bar_data( - # exchange=exchange, - # assets=assets, - # writer=daily_bar_writer, - # data_frequency='daily', - # show_progress=show_progress, - # start=start, - # end=end - # ) + return start, end - if minute_bar_writer is not None: - process_bar_data( - exchange=exchange, - assets=assets, - writer=minute_bar_writer, - data_frequency='minute', - show_progress=show_progress, - start=start, - end=end + @property + def reader(self): + if self._reader is not None: + return self._reader + + root = get_exchange_folder(self.exchange.name) + input_dir = '{root}/{frequency}_bundle'.format( + root=root, + frequency=self.data_frequency + ) + + if self.data_frequency == 'minute': + try: + self._reader = BcolzMinuteBarReader(input_dir) + except IOError: + log.debug('no reader data found in {}'.format(input_dir)) + + elif self.data_frequency == 'daily': + try: + self._reader = BcolzDailyBarReader(input_dir) + except IOError: + log.debug('no reader data found in {}'.format(input_dir)) + else: + raise ValueError( + 'invalid frequency {}'.format(self.data_frequency) ) - return ingest + return self._reader + + @property + def writer(self): + if self._writer is not None: + return self._writer + + open_calendar = get_calendar('OPEN') + + root = get_exchange_folder(self.exchange.name) + output_dir = '{root}/{frequency}_bundle'.format( + root=root, + frequency=self.data_frequency + ) + ensure_directory(output_dir) + + if self.data_frequency == 'minute': + if len(os.listdir(output_dir)) > 0: + self._writer = BcolzMinuteBarWriter.open(output_dir, self.end) + else: + self._writer = BcolzMinuteBarWriter( + rootdir=output_dir, + calendar=open_calendar, + minutes_per_day=self.minutes_per_day, + start_session=self.start, + end_session=self.end, + write_metadata=True, + default_ohlc_ratio=self.default_ohlc_ratio + ) + elif self.data_frequency == 'daily': + if len(os.listdir(output_dir)) > 0: + self._writer = BcolzDailyBarWriter.open(output_dir, self.end) + else: + self._writer = BcolzDailyBarWriter( + filename=output_dir, + calendar=open_calendar, + start_session=self.start, + end_session=self.end + ) + else: + raise ValueError( + 'invalid frequency {}'.format(self.data_frequency) + ) + + return self._writer + + def ingest(self): + symbols = [] + log.debug( + 'ingesting trading pairs {symbols} on exchange {exchange} ' + 'from {start} to {end}'.format( + symbols=symbols, + exchange=self.exchange.name, + start=self.start, + end=self.end + ) + ) + + delta = self.end - self.start + if self.data_frequency == 'minute': + delta_periods = delta.total_seconds() / 60 + frequency = '1m' + + elif self.data_frequency == 'daily': + delta_periods = delta.total_seconds() / 60 / 60 / 24 + frequency = '1d' + + else: + raise ValueError('frequency not supported') + + if delta_periods > self.exchange.num_candles_limit: + bar_count = self.exchange.num_candles_limit + + chunks = [] + last_chunk_date = self.end.floor('1 min') + while last_chunk_date > self.start + timedelta(minutes=bar_count): + # TODO: account for the partial last bar + chunk = dict(end=last_chunk_date, bar_count=bar_count) + chunks.append(chunk) + + # TODO: base on frequency + last_chunk_date = \ + last_chunk_date - timedelta(minutes=(bar_count + 1)) + + chunks.reverse() + + else: + chunks = [dict(end=self.end, bar_count=delta_periods)] + + with maybe_show_progress( + chunks, + self.show_progress, + label='Fetching {exchange} {frequency} candles: '.format( + exchange=self.exchange.name, + frequency=self.data_frequency + )) as it: + + previous_candle = dict() + for chunk in it: + chunk_end = chunk['end'] + chunk_start = chunk_end - timedelta(minutes=chunk['bar_count']) + + chunk_assets = [] + for asset in self.assets: + if asset.start_date <= chunk_end: + chunk_assets.append(asset) + + # TODO: ensure correct behavior for assets starting in the chunk + candles = fetch_candles_chunk( + exchange=self.exchange, + assets=chunk_assets, + data_frequency=frequency, + end_dt=chunk_end, + bar_count=chunk['bar_count'] + ) + log.debug( + 'requests counter {}'.format(self.exchange.request_cpt)) + + num_candles = 0 + data = [] + for asset in candles: + asset_candles = candles[asset] + if not asset_candles: + log.debug( + 'no data: {symbols} on {exchange}, date {end}'.format( + symbols=chunk_assets, + exchange=self.exchange.name, + end=chunk_end + ) + ) + continue + + all_dates = [] + all_candles = [] + date = chunk_start + while date <= chunk_end: + + previous = previous_candle[asset] \ + if asset in previous_candle else None + + candle = next((candle for candle in asset_candles \ + if candle['last_traded'] == date), + previous) + + if candle is not None: + all_dates.append(date) + all_candles.append(candle) + + previous_candle[asset] = candle + + date += timedelta(minutes=1) + + df = pd.DataFrame(all_candles, index=all_dates) + if not df.empty: + df.sort_index(inplace=True) + + sid = asset.sid + num_candles += len(df.values) + + data.append((sid, df)) + + try: + log.debug( + 'writing {num_candles} candles from {start} to {end}'.format( + num_candles=num_candles, + start=chunk_start, + end=chunk_end + ) + ) + + for pair in data: + log.debug('data for sid {}\n{}\n{}'.format( + pair[0], pair[1].head(2), pair[1].tail(2))) + + self.writer.write( + data=data, + show_progress=False, + invalid_data_behavior='raise' + ) + except BcolzMinuteOverlappingData as e: + log.warn('chunk already exists {}: {}'.format(chunk, e)) diff --git a/catalyst/exchange/exchange_utils.py b/catalyst/exchange/exchange_utils.py index 2d3729ed..c6884d73 100644 --- a/catalyst/exchange/exchange_utils.py +++ b/catalyst/exchange/exchange_utils.py @@ -3,6 +3,7 @@ import os import pickle import urllib from datetime import date, datetime + import pandas as pd from catalyst.exchange.exchange_errors import ExchangeAuthNotFound, \ diff --git a/catalyst/exchange/init_utils.py b/catalyst/exchange/init_utils.py new file mode 100644 index 00000000..446fe1cb --- /dev/null +++ b/catalyst/exchange/init_utils.py @@ -0,0 +1,24 @@ +from catalyst.exchange.bitfinex.bitfinex import Bitfinex +from catalyst.exchange.bittrex.bittrex import Bittrex +from catalyst.exchange.exchange_errors import ExchangeNotFoundError +from catalyst.exchange.exchange_utils import get_exchange_auth + + +def get_exchange(exchange_name): + exchange_auth = get_exchange_auth(exchange_name) + if exchange_name == 'bitfinex': + return Bitfinex( + key=exchange_auth['key'], + secret=exchange_auth['secret'], + base_currency=None, # TODO: make optional at the exchange + portfolio=None + ) + elif exchange_name == 'bittrex': + return Bittrex( + key=exchange_auth['key'], + secret=exchange_auth['secret'], + base_currency=None, + portfolio=None + ) + else: + raise ExchangeNotFoundError(exchange_name=exchange_name) diff --git a/tests/exchange/test_bitfinex.py b/tests/exchange/test_bitfinex.py index 7aaddc14..7efd4884 100644 --- a/tests/exchange/test_bitfinex.py +++ b/tests/exchange/test_bitfinex.py @@ -1,11 +1,9 @@ -from catalyst.exchange.bitfinex.bitfinex import Bitfinex -from base import BaseExchangeTestCase from logbook import Logger -from catalyst.finance.execution import (MarketOrder, - LimitOrder, - StopOrder, - StopLimitOrder) + +from base import BaseExchangeTestCase +from catalyst.exchange.bitfinex.bitfinex import Bitfinex from catalyst.exchange.exchange_utils import get_exchange_auth +from catalyst.finance.execution import (LimitOrder) log = Logger('test_bitfinex') diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index 6856970c..0441f4a6 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -1,13 +1,8 @@ -import os -from datetime import timedelta -from logging import Logger, DEBUG +from logging import Logger import pandas as pd -from catalyst import get_calendar -from catalyst.data.minute_bars import BcolzMinuteBarWriter -from catalyst.exchange.exchange_bundle import exchange_bundle -from catalyst.utils.paths import ensure_directory, data_root +from catalyst.exchange.exchange_bundle import ExchangeBundle log = Logger('test_exchange_bundle') @@ -18,50 +13,15 @@ class ExchangeBundleTestCase: start = pd.to_datetime('2017-09-01', utc=True) end = pd.Timestamp.utcnow() - open_calendar = get_calendar('OPEN') - root = data_root(os.environ) - output_dir = '{root}/exchange_{exchange}/2017-09-21T05;34;37.274482'.format( - root=root, - exchange=exchange_name - ) - ensure_directory(output_dir) - - filename = os.path.join(output_dir, 'metadata.json') - - start_session = start.floor('1d') - if os.path.isfile(filename): - minute_bar_writer = BcolzMinuteBarWriter.open(output_dir, end) - else: - # TODO: need to be able to write more precise numbers - minute_bar_writer = BcolzMinuteBarWriter( - rootdir=output_dir, - calendar=open_calendar, - minutes_per_day=1440, - start_session=start_session, - end_session=end, - write_metadata=True, - default_ohlc_ratio=1000000 - ) - - ingest = exchange_bundle( + log.info('ingesting exchange bundle {}'.format(exchange_name)) + exchange_bundle = ExchangeBundle( exchange_name=exchange_name, - symbols=['eth_btc'], - log_level=DEBUG + data_frequency='minute', + include_symbols=None, + exclude_symbols=None, + start=start, + end=end, + show_progress=True ) - - ingest(environ=os.environ, - asset_db_writer=None, - minute_bar_writer=minute_bar_writer, - daily_bar_writer=None, - adjustment_writer=None, - calendar=open_calendar, - start_session=start_session, - end_session=end, - cache=dict(), - show_progress=True, - is_compile=False, - output_dir=output_dir, - start=start, - end=end) pass diff --git a/tests/exchange/test_data_portal.py b/tests/exchange/test_data_portal.py index 2b881541..8d8f90e6 100644 --- a/tests/exchange/test_data_portal.py +++ b/tests/exchange/test_data_portal.py @@ -1,19 +1,13 @@ -from datetime import timedelta - -import os import pandas as pd -from catalyst import get_calendar from logbook import Logger -from catalyst.data.minute_bars import BcolzMinuteBarReader +from catalyst import get_calendar from catalyst.exchange.asset_finder_exchange import AssetFinderExchange from catalyst.exchange.bitfinex.bitfinex import Bitfinex from catalyst.exchange.bittrex.bittrex import Bittrex from catalyst.exchange.data_portal_exchange import DataPortalExchangeBacktest, \ DataPortalExchangeLive -from catalyst.exchange.exchange_bundle import exchange_bundle from catalyst.exchange.exchange_utils import get_exchange_auth -from catalyst.utils.run_algo import load_extensions log = Logger('test_bitfinex')