Merge branch 'develop' of github.com:enigmampc/catalyst into develop

This commit is contained in:
Victor Grau Serrat
2017-10-31 00:05:55 -06:00
6 changed files with 446 additions and 138 deletions
+37 -12
View File
@@ -103,34 +103,59 @@ def get_start_dt(end_dt, bar_count, data_frequency):
return start_dt
def get_month_start_end(dt):
def get_period_label(dt, data_frequency):
"""
Returns the first and last day of the month for the specified date.
The period label for the specified date and frequency.
:param dt:
:param data_frequency:
:return:
"""
return '{}-{:02d}'.format(dt.year, dt.month) if data_frequency == 'minute' \
else '{}'.format(dt.year)
def get_month_start_end(dt, first_day=None, last_day=None):
"""
The first and last day of the month for the specified date.
:param dt:
:param first_day
:param last_day
:return:
"""
month_range = calendar.monthrange(dt.year, dt.month)
month_start = pd.to_datetime(datetime(
dt.year, dt.month, 1, 0, 0, 0, 0
), utc=True)
month_end = pd.to_datetime(datetime(
dt.year, dt.month, month_range[1], 23, 59, 0, 0
), utc=True)
if first_day:
month_start = first_day
else:
month_start = pd.to_datetime(datetime(
dt.year, dt.month, 1, 0, 0, 0, 0
), utc=True)
if last_day:
month_end = last_day
else:
month_end = pd.to_datetime(datetime(
dt.year, dt.month, month_range[1], 23, 59, 0, 0
), utc=True)
return month_start, month_end
def get_year_start_end(dt):
def get_year_start_end(dt, first_day=None, last_day=None):
"""
Returns the first and last day of the year for the specified date.
The first and last day of the year for the specified date.
:param dt:
:param first_day
:param last_day
:return:
"""
year_start = pd.to_datetime(date(dt.year, 1, 1), utc=True)
year_end = pd.to_datetime(date(dt.year, 12, 31), utc=True)
year_start = first_day if first_day \
else pd.to_datetime(date(dt.year, 1, 1), utc=True)
year_end = last_day if last_day \
else pd.to_datetime(date(dt.year, 12, 31), utc=True)
return year_start, year_end
+146 -118
View File
@@ -1,9 +1,14 @@
import os
import os
import shutil
from datetime import timedelta
from itertools import chain
import pandas as pd
from catalyst.assets._assets import TradingPair
from logbook import Logger
from pandas.tslib import Timestamp
from pytz import UTC
from six import itervalues
from catalyst import get_calendar
from catalyst.constants import LOG_LEVEL
@@ -11,11 +16,11 @@ from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \
BcolzMinuteBarMetadata
from catalyst.exchange.bundle_utils import range_in_bundle, \
get_bcolz_chunk, get_delta, get_month_start_end, \
get_year_start_end, get_df_from_arrays, get_start_dt
get_year_start_end, get_df_from_arrays, get_start_dt, get_period_label
from catalyst.exchange.exchange_bcolz import BcolzExchangeBarReader, \
BcolzExchangeBarWriter
from catalyst.exchange.exchange_errors import EmptyValuesInBundleError, \
InvalidHistoryFrequencyError, TempBundleNotFoundError, \
TempBundleNotFoundError, \
NoDataAvailableOnExchange, \
PricingDataNotLoadedError
from catalyst.exchange.exchange_utils import get_exchange_folder
@@ -383,7 +388,7 @@ class ExchangeBundle:
"""
reader = self.get_reader(data_frequency)
chunks = []
chunks = dict()
for asset in assets:
try:
# Checking if the the asset has price data in the specified
@@ -397,98 +402,57 @@ class ExchangeBundle:
log.debug('skipping {}: {}'.format(asset.symbol, e))
continue
# This is either the first trading day of the asset or the
# first session available in the calendar
first_trading_dt = asset.start_date \
if asset.start_date > self.calendar.first_session \
else self.calendar.first_session
dates = pd.date_range(
start=get_period_label(adj_start, data_frequency),
end=get_period_label(adj_end, data_frequency),
freq='MS' if data_frequency == 'minute' else 'AS',
tz=UTC
)
# Aligning start / end dates with the daily calendar
sessions = self.calendar.sessions_in_range(adj_start, adj_end)
# Adjusting the last date of the range to avoid
# going over the asset's trading bounds
dates.values[0] = adj_start
dates.values[-1] = adj_end
# We loop through each session to create chunks for each period
chunk_labels = []
dt = sessions[0]
while dt <= sessions[-1]:
label = '{}-{:02d}'.format(dt.year, dt.month) \
if data_frequency == 'minute' else '{}'.format(dt.year)
chunks[asset] = []
for index, dt in enumerate(dates):
get_start_end = get_month_start_end \
if data_frequency == 'minute' else get_year_start_end
if label not in chunk_labels:
chunk_labels.append(label)
period_start, period_end = get_start_end(
dt=dt,
first_day=dt if index == 0 else None,
last_day=dt if index == len(dates) - 1 else None
)
# Adjusting the period dates to match the availability
# of the trading pair
if data_frequency == 'minute':
period_start, period_end = get_month_start_end(dt)
# Currencies don't always start trading at midnight.
# Checking the last minute of the day instead.
range_start = period_start.replace(hour=23, minute=59) \
if data_frequency == 'minute' else period_start
asset_start_month, _ = get_month_start_end(
first_trading_dt
# Checking if the data already exists in the bundle
# for the date range of the chunk. If not, we create
# a chunk for ingestion.
has_data = range_in_bundle(
asset, range_start, period_end, reader
)
if not has_data:
chunks[asset].append(
dict(
asset=asset,
period_start=period_start,
period_end=period_end,
period=get_period_label(dt, data_frequency)
)
if asset_start_month == period_start \
and period_start < first_trading_dt:
period_start = first_trading_dt
# TODO: need to filter closed pairs?
_, asset_end_month = get_month_start_end(
asset.end_minute
)
if asset_end_month == period_end \
and period_end > asset.end_minute:
period_end = asset.end_minute
elif data_frequency == 'daily':
period_start, period_end = get_year_start_end(dt)
asset_start_year, _ = get_year_start_end(
first_trading_dt
)
if asset_start_year == period_start \
and period_start < first_trading_dt:
period_start = first_trading_dt
_, asset_end_year = get_year_start_end(
asset.end_daily
)
if asset_end_year == period_end \
and period_end > asset.end_daily:
period_end = asset.end_daily
else:
raise InvalidHistoryFrequencyError(
frequency=data_frequency
)
# Currencies don't always start trading at midnight.
# Checking the last minute of the day instead.
range_start = period_start.replace(hour=23, minute=59) \
if data_frequency == 'minute' else period_start
# Checking if the data already exists in the bundle
# for the date range of the chunk. If not, we create
# a chunk for ingestion.
has_data = range_in_bundle(
asset, range_start, period_end, reader
)
if not has_data:
log.debug('adding period: {}'.format(label))
chunks.append(
dict(
asset=asset,
period_start=period_start,
period_end=period_end,
period=label
)
)
dt += timedelta(days=1)
# We sort the chunks by end date to ingest most recent data first
chunks.sort(key=lambda chunk: chunk['period_end'])
# We sort the chunks by end date to ingest most recent data first
chunks[asset].sort(key=lambda chunk: chunk['period_end'])
return chunks
def ingest_assets(self, assets, start_dt, end_dt, data_frequency,
show_progress=False):
def ingest_assets(self, assets, data_frequency, start_dt=None, end_dt=None,
show_progress=False, asset_chunks=False):
"""
Determine if data is missing from the bundle and attempt to ingest it.
@@ -497,6 +461,16 @@ class ExchangeBundle:
:param end_dt:
:return:
"""
if start_dt is None:
start_dt = self.calendar.first_session
if end_dt is None:
end_dt = pd.Timestamp.utcnow()
start_dt, end_dt = self.get_adj_dates(
start_dt, end_dt, assets, data_frequency
)
chunks = self.prepare_chunks(
assets=assets,
data_frequency=data_frequency,
@@ -507,7 +481,8 @@ class ExchangeBundle:
# Since chunks are either monthly or yearly, it is possible that
# our ingestion data range is greater than specified. We adjust
# the boundaries to ensure that the writer can write all data.
for chunk in chunks:
all_chunks = list(chain.from_iterable(itervalues(chunks)))
for chunk in all_chunks:
if chunk['period_start'] < start_dt:
start_dt = chunk['period_start']
@@ -515,24 +490,49 @@ class ExchangeBundle:
end_dt = chunk['period_end']
writer = self.get_writer(start_dt, end_dt, data_frequency)
with maybe_show_progress(
chunks,
show_progress,
label='Fetching {exchange} {frequency} candles: '.format(
exchange=self.exchange.name,
frequency=data_frequency
)) as it:
for chunk in it:
self.ingest_ctable(
asset=chunk['asset'],
data_frequency=data_frequency,
period=chunk['period'],
start_dt=chunk['period_start'],
end_dt=chunk['period_end'],
writer=writer,
empty_rows_behavior='strip',
cleanup=True
)
if asset_chunks:
for asset in chunks:
with maybe_show_progress(
chunks[asset],
show_progress,
label='Ingesting {frequency} price data for '
'{symbol} on {exchange}'.format(
exchange=self.exchange.name,
frequency=data_frequency,
symbol=asset.symbol
)) as it:
for chunk in it:
self.ingest_ctable(
asset=chunk['asset'],
data_frequency=data_frequency,
period=chunk['period'],
start_dt=chunk['period_start'],
end_dt=chunk['period_end'],
writer=writer,
empty_rows_behavior='strip',
cleanup=True
)
else:
with maybe_show_progress(
all_chunks,
show_progress,
label='Ingesting {frequency} price data on '
'{exchange}'.format(
exchange=self.exchange.name,
frequency=data_frequency,
)) as it:
for chunk in it:
self.ingest_ctable(
asset=chunk['asset'],
data_frequency=data_frequency,
period=chunk['period'],
start_dt=chunk['period_start'],
end_dt=chunk['period_end'],
writer=writer,
empty_rows_behavior='strip',
cleanup=True
)
def ingest(self, data_frequency, include_symbols=None,
exclude_symbols=None, start=None, end=None,
@@ -549,20 +549,30 @@ class ExchangeBundle:
:return:
"""
assets = self.get_assets(include_symbols, exclude_symbols)
start_dt, end_dt = self.get_adj_dates(
start, end, assets, data_frequency
)
for frequency in data_frequency.split(','):
self.ingest_assets(assets, start_dt, end_dt, frequency,
self.ingest_assets(assets, frequency, start, end,
show_progress)
def get_history_window_series_and_load(self,
assets,
end_dt,
bar_count,
field,
data_frequency):
assets, # type: List[TradingPair]
end_dt, # type: Timestamp
bar_count, # type: int
field, # type: str
data_frequency, # type: str
algo_end_dt=None # type: Timestamp
):
# type: (...) -> Dict[str, Series]
"""
Retrieve price data history, ingest missing data.
:param assets:
:param end_dt:
:param bar_count:
:param field:
:param data_frequency:
:return:
"""
try:
series = self.get_history_window_series(
assets=assets,
@@ -586,9 +596,10 @@ class ExchangeBundle:
self.ingest_assets(
assets=assets,
start_dt=start_dt,
end_dt=end_dt,
end_dt=algo_end_dt,
data_frequency=data_frequency,
show_progress=True
show_progress=True,
asset_chunks=True
)
series = self.get_history_window_series(
assets=assets,
@@ -596,12 +607,29 @@ class ExchangeBundle:
bar_count=bar_count,
field=field,
data_frequency=data_frequency,
reset_reader=True
reset_reader=False
)
return series
def get_spot_values(self, assets, field, dt, data_frequency,
reset_reader=False):
def get_spot_values(self,
assets, # type: List[TradingPair]
field, # type: str
dt, # type: Timestamp
data_frequency, # type: str
reset_reader=False # type: bool
):
# type: (...) -> List[float]
"""
The spot values for the gives assets, field and date. Reads from
the exchange data bundle.
:param assets:
:param field:
:param dt:
:param data_frequency:
:param reset_reader:
:return:
"""
values = []
try:
reader = self.get_reader(data_frequency)
+22 -4
View File
@@ -301,7 +301,7 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase):
:param ffill:
:return:
"""
bundle = self.exchange_bundles[exchange.name]
bundle = self.exchange_bundles[exchange.name] # type: ExchangeBundle
candle_size, unit, data_frequency = get_frequency(
frequency, data_frequency
@@ -313,14 +313,32 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase):
end_dt=end_dt,
bar_count=adj_bar_count,
field=field,
data_frequency=data_frequency
data_frequency=data_frequency,
algo_end_dt=self._last_available_session,
)
df = resample_history_df(pd.DataFrame(series), candle_size, field)
return df
def get_exchange_spot_value(self, exchange, assets, field, dt,
data_frequency):
def get_exchange_spot_value(self,
exchange, # type: Exchange
assets, # type: List[TradingPair]
field, # type: str
dt, # type: Timestamp
data_frequency # type: str
):
# type: (...) -> float
"""
A spot value for the exchange bundle. Try to ingest data if not in
the bundle.
:param exchange:
:param assets:
:param field:
:param dt:
:param data_frequency:
:return:
"""
bundle = self.exchange_bundles[exchange.name]
if data_frequency == 'daily':
+40
View File
@@ -1,4 +1,44 @@
import pandas as pd
import numpy as np
def crossover(source, target):
"""
The `x`-series is defined as having crossed over `y`-series if the value
of `x` is greater than the value of `y` and the value of `x` was less than
the value of `y` on the bar immediately preceding the current bar.
:param source:
:param target:
:return:
"""
if source[-1] is np.nan or source[-2] is np.nan \
or target[-1] is np.nan or target[-2] is np.nan:
return False
if source[-1] > target[-1] and source[-2] < target[-2]:
return True
else:
return False
def crossunder(source, target):
"""
The `x`-series is defined as having crossed under `y`-series if the value
of `x` is less than the value of `y` and the value of `x` was greater than
the value of `y` on the bar immediately preceding the current bar.
:param source:
:param target:
:return:
"""
if source[-1] is np.nan or source[-2] is np.nan \
or target[-1] is np.nan or target[-2] is np.nan:
return False
if source[-1] < target[-1] and source[-2] > target[-2]:
return True
else:
return False
def get_pretty_stats(stats_df, recorded_cols=None, num_rows=10):
+140
View File
@@ -0,0 +1,140 @@
"""
Requires Catalyst version 0.3.0 or above
Tested on Catalyst version 0.3.2
These example aims to provide and easy way for users to learn how to collect data from the different exchanges.
You simply need to specify the exchange and the market that you want to focus on.
You will all see how to create a universe and filter it base on the exchange and the market you desire.
The example prints out the closing price of all the pairs for a given market-exchange every 30 minutes.
The example also contains the ohlcv minute data for the past seven days which could be used to create indicators
Use this as the backbone to create your own trading strategies.
Variables lookback date and date are used to ensure data for a coin existed on the lookback period specified.
"""
import numpy as np
import pandas as pd
from datetime import timedelta
from catalyst import run_algorithm
from catalyst.exchange.exchange_utils import get_exchange_symbols
from catalyst.api import (
symbols,
)
def initialize(context):
context.i = -1 # counts the minutes
context.exchange = 'poloniex' # must match the exchange specified in run_algorithm
context.base_currency = 'eth' # must match the base currency specified in run_algorithm
def handle_data(context, data):
lookback = 60 * 24 * 7 # (minutes, hours, days) of how far to lookback in the data history
context.i += 1
# current date formatted into a string
today = context.blotter.current_dt
date, time = today.strftime('%Y-%m-%d %H:%M:%S').split(' ')
lookback_date = today - timedelta(days=(
lookback / (60 * 24))) # subtract the amount of days specified in lookback
lookback_date = lookback_date.strftime('%Y-%m-%d %H:%M:%S').split(' ')[
0] # get only the date as a string
# update universe everyday
new_day = 60 * 24
if not context.i % new_day:
context.universe = universe(context, lookback_date, date)
# get data every 30 minutes
minutes = 30
if not context.i % minutes and context.universe:
# we iterate for every pair in the current universe
for coin in context.coins:
pair = str(coin.symbol)
# 30 minute interval ohlcv data (the standard data required for candlestick or indicators/signals)
# 30T means 30 minutes re-sampling of one minute data. change to your desire time interval.
open = fill(data.history(coin, 'open', bar_count=lookback,
frequency='1m')).resample('30T').first()
high = fill(data.history(coin, 'high', bar_count=lookback,
frequency='1m')).resample('30T').max()
low = fill(data.history(coin, 'low', bar_count=lookback,
frequency='1m')).resample('30T').min()
close = fill(data.history(coin, 'price', bar_count=lookback,
frequency='1m')).resample('30T').last()
volume = fill(data.history(coin, 'volume', bar_count=lookback,
frequency='1m')).resample('30T').sum()
# close[-1] is the equivalent to current price
# displays the minute price for each pair every 30 minutes
print(
today, pair, open[-1], high[-1], low[-1], close[-1], volume[-1])
# ----------------------------------------------------------------------------------------------------------
# -------------------------------------- Insert Your Strategy Here -----------------------------------------
# ----------------------------------------------------------------------------------------------------------
def analyze(context=None, results=None):
pass
# Get the universe for a given exchange and a given base_currency market
# Example: Poloniex BTC Market
def universe(context, lookback_date, current_date):
json_symbols = get_exchange_symbols(
context.exchange) # get all the pairs for the exchange
universe_df = pd.DataFrame.from_dict(json_symbols).transpose().astype(
str) # convert into a dataframe
universe_df['base_currency'] = universe_df.apply(
lambda row: row.symbol.split('_')[1],
axis=1)
universe_df['market_currency'] = universe_df.apply(
lambda row: row.symbol.split('_')[0],
axis=1)
# Filter all the exchange pairs to only the ones for a give base currency
universe_df = universe_df[
universe_df['base_currency'] == context.base_currency]
# Filter all the pairs to ensure that pair existed in the current date range
universe_df = universe_df[universe_df.start_date < lookback_date]
universe_df = universe_df[universe_df.end_daily >= current_date]
context.coins = symbols(
*universe_df.symbol) # convert all the pairs to symbols
print(universe_df.head(), len(universe_df))
return universe_df.symbol.tolist()
# Replace all NA, NAN or infinite values with its nearest value
def fill(series):
if isinstance(series, pd.Series):
return series.replace([np.inf, -np.inf], np.nan).ffill().bfill()
elif isinstance(series, np.ndarray):
return pd.Series(series).replace([np.inf, -np.inf],
np.nan).ffill().bfill().values
else:
return series
if __name__ == '__main__':
start_date = pd.to_datetime('2017-01-01', utc=True)
end_date = pd.to_datetime('2017-10-15', utc=True)
performance = run_algorithm(start=start_date, end=end_date,
capital_base=10000.0,
initialize=initialize,
handle_data=handle_data,
analyze=analyze,
exchange_name='poloniex',
data_frequency='minute',
base_currency='eth',
live=False,
live_graph=False,
algo_namespace='simple_universe')
"""
Run in Terminal (inside catalyst environment):
python simple_universe.py
"""
+61 -4
View File
@@ -1,17 +1,20 @@
import hashlib
import tempfile
from logging import getLogger
import os
import pandas as pd
from catalyst import get_calendar
from catalyst.exchange.bundle_utils import get_bcolz_chunk, \
get_periods_range, get_start_dt
get_periods_range, get_start_dt, get_month_start_end, get_df_from_arrays, \
get_year_start_end
from catalyst.exchange.exchange_bcolz import BcolzExchangeBarReader, \
BcolzExchangeBarWriter
from catalyst.exchange.exchange_bundle import ExchangeBundle, \
BUNDLE_NAME_TEMPLATE
from catalyst.exchange.exchange_utils import get_exchange_folder
from catalyst.exchange.init_utils import get_exchange
from catalyst.exchange.factory import get_exchange
from catalyst.exchange.stats_utils import df_to_string
from catalyst.utils.paths import ensure_directory
@@ -45,11 +48,11 @@ class TestExchangeBundle:
exchange = get_exchange(exchange_name)
exchange_bundle = ExchangeBundle(exchange)
assets = [
exchange.get_asset('iot_btc')
exchange.get_asset('xmr_btc')
]
# start = pd.to_datetime('2017-09-01', utc=True)
start = pd.to_datetime('2017-9-01', utc=True)
start = pd.to_datetime('2016-01-01', utc=True)
end = pd.to_datetime('2017-9-30', utc=True)
log.info('ingesting exchange bundle {}'.format(exchange_name))
@@ -426,3 +429,57 @@ class TestExchangeBundle:
df = pd.DataFrame(bundle_series)
print('\n' + df_to_string(df))
pass
def bundle_to_csv(self):
exchange_name = 'poloniex'
data_frequency = 'daily'
period = '2016'
exchange = get_exchange(exchange_name)
bundle = ExchangeBundle(exchange)
asset = exchange.get_asset('xmr_btc')
path = get_bcolz_chunk(
exchange_name=exchange.name,
symbol=asset.symbol,
data_frequency=data_frequency,
period=period
)
dt = pd.to_datetime(period, utc=True)
if data_frequency == 'minute':
start_dt, end_dt = get_month_start_end(dt)
else:
start_dt, end_dt = get_year_start_end(dt)
reader = bundle.get_reader(data_frequency, path=path)
arrays = None
try:
arrays = reader.load_raw_arrays(
sids=[asset.sid],
fields=['open', 'high', 'low', 'close', 'volume'],
start_dt=start_dt,
end_dt=end_dt
)
except Exception as e:
log.warn('skipping ctable for {} from {} to {}: {}'.format(
asset.symbol, start_dt, end_dt, e
))
periods = bundle.get_calendar_periods_range(
start_dt, end_dt, data_frequency
)
df = get_df_from_arrays(arrays, periods)
folder = os.path.join(
tempfile.gettempdir(), 'catalyst', exchange.name, asset.symbol
)
ensure_directory(folder)
path = os.path.join(folder, period + '.csv')
log.info('creating csv file: {}'.format(path))
print('HEAD\n{}'.format(df.head(10)))
print('TAIL\n{}'.format(df.tail(10)))
df.to_csv(path)
pass