mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-04 16:46:45 +08:00
Integrating with history api
This commit is contained in:
@@ -395,6 +395,9 @@ cdef class TradingPair(Asset):
|
||||
cdef readonly float leverage
|
||||
cdef readonly object market_currency
|
||||
cdef readonly object base_currency
|
||||
cdef readonly object end_daily
|
||||
cdef readonly object end_minute
|
||||
cdef readonly object exchange_symbol
|
||||
|
||||
_kwargnames = frozenset({
|
||||
'sid',
|
||||
@@ -409,7 +412,10 @@ cdef class TradingPair(Asset):
|
||||
'leverage',
|
||||
'market_currency',
|
||||
'base_currency',
|
||||
'min_trade_size',
|
||||
'end_daily',
|
||||
'end_minute',
|
||||
'exchange_symbol',
|
||||
'min_trade_size'
|
||||
})
|
||||
def __init__(self,
|
||||
object symbol,
|
||||
@@ -418,7 +424,10 @@ cdef class TradingPair(Asset):
|
||||
object asset_name=None,
|
||||
int sid=0,
|
||||
float leverage=1.0,
|
||||
object end_daily=None,
|
||||
object end_minute=None,
|
||||
object end_date=None,
|
||||
object exchange_symbol=None,
|
||||
object first_traded=None,
|
||||
object auto_close_date=None,
|
||||
object exchange_full=None,
|
||||
@@ -474,7 +483,10 @@ cdef class TradingPair(Asset):
|
||||
:param asset_name:
|
||||
:param sid:
|
||||
:param leverage:
|
||||
:param end_daily
|
||||
:param end_minute
|
||||
:param end_date:
|
||||
:param exchange_symbol:
|
||||
:param first_traded:
|
||||
:param auto_close_date:
|
||||
:param exchange_full:
|
||||
@@ -516,6 +528,9 @@ cdef class TradingPair(Asset):
|
||||
)
|
||||
|
||||
self.leverage = leverage
|
||||
self.end_daily = end_daily
|
||||
self.end_minute = end_minute
|
||||
self.exchange_symbol = exchange_symbol
|
||||
|
||||
def __repr__(self):
|
||||
return 'Trading Pair {symbol}({sid}) Exchange: {exchange}, ' \
|
||||
@@ -523,7 +538,9 @@ cdef class TradingPair(Asset):
|
||||
'Market Currency: {market_currency}, ' \
|
||||
'Base Currency: {base_currency}, ' \
|
||||
'Exchange Leverage: {leverage}, ' \
|
||||
'Minimum Trade Size: {min_trade_size}'.format(
|
||||
'Minimum Trade Size: {min_trade_size} ' \
|
||||
'Last daily ingestion: {end_daily} ' \
|
||||
'Last minutely ingestion: {end_minute}'.format(
|
||||
symbol=self.symbol,
|
||||
sid=self.sid,
|
||||
exchange=self.exchange,
|
||||
@@ -531,7 +548,9 @@ cdef class TradingPair(Asset):
|
||||
market_currency=self.market_currency,
|
||||
base_currency=self.base_currency,
|
||||
leverage=self.leverage,
|
||||
min_trade_size=self.min_trade_size
|
||||
min_trade_size=self.min_trade_size,
|
||||
end_daily=self.end_daily,
|
||||
end_minute=self.end_minute
|
||||
)
|
||||
|
||||
cpdef __reduce__(self):
|
||||
|
||||
@@ -2,6 +2,8 @@ import datetime, requests
|
||||
import os
|
||||
from logging import Logger
|
||||
|
||||
import pytz
|
||||
|
||||
from catalyst.data.bundles import from_bundle_ingest_dirname
|
||||
from catalyst.utils.paths import data_path
|
||||
|
||||
@@ -15,8 +17,15 @@ def get_date_from_ms(ms):
|
||||
return datetime.datetime.fromtimestamp(ms / 1000.0)
|
||||
|
||||
|
||||
def get_history(exchange_name, data_frequency, symbol, start_ms=None,
|
||||
end_ms=None):
|
||||
def get_seconds_from_date(date):
|
||||
epoch = datetime.datetime.utcfromtimestamp(0)
|
||||
epoch = epoch.replace(tzinfo=pytz.UTC)
|
||||
|
||||
return int((date - epoch).total_seconds())
|
||||
|
||||
|
||||
def get_history(exchange_name, data_frequency, symbol, start_seconds=None,
|
||||
end_seconds=None):
|
||||
"""
|
||||
History API provides OHLCV data for any of the supported exchanges up to yesterday.
|
||||
|
||||
@@ -27,10 +36,10 @@ def get_history(exchange_name, data_frequency, symbol, start_ms=None,
|
||||
*** currently only 'daily' is supported ***
|
||||
:param symbol: string
|
||||
Required: The trading pair symbol.
|
||||
:param start: float
|
||||
Optional: The start date in milliseconds.
|
||||
:param end: float
|
||||
Optional: The end date in milliseconds.
|
||||
:param start_seconds: int
|
||||
Optional: The start date in seconds.
|
||||
:param end_seconds: int
|
||||
Optional: The end date in seconds.
|
||||
|
||||
:return ohlcv: list[dict[string, float]]
|
||||
Each row contains the following dictionary for the resulting bars:
|
||||
@@ -67,11 +76,11 @@ def get_history(exchange_name, data_frequency, symbol, start_ms=None,
|
||||
data_frequency=data_frequency,
|
||||
)
|
||||
|
||||
if start_ms:
|
||||
url += '&start={}'.format(int(start_ms / 1000))
|
||||
if start_seconds:
|
||||
url += '&start={}'.format(start_seconds)
|
||||
|
||||
if end_ms:
|
||||
url += '&end={}'.format(int(end_ms / 1000))
|
||||
if end_seconds:
|
||||
url += '&end={}'.format(end_seconds)
|
||||
|
||||
try:
|
||||
response = requests.get(url)
|
||||
@@ -80,7 +89,7 @@ def get_history(exchange_name, data_frequency, symbol, start_ms=None,
|
||||
|
||||
data = response.json()
|
||||
|
||||
if 'error' in response:
|
||||
if 'error' in data:
|
||||
raise ValueError(response['error'])
|
||||
|
||||
return data
|
||||
|
||||
@@ -226,6 +226,16 @@ class Exchange:
|
||||
else:
|
||||
min_trade_size = 0.0000001
|
||||
|
||||
if 'end_daily' in asset and asset['end_daily'] != 'N/A':
|
||||
end_daily = pd.to_datetime(asset['end_daily'], utc=True)
|
||||
else:
|
||||
end_daily = None
|
||||
|
||||
if 'end_minute' in asset and asset['end_minute'] != 'N/A':
|
||||
end_minute = pd.to_datetime(asset['end_minute'], utc=True)
|
||||
else:
|
||||
end_minute = None
|
||||
|
||||
trading_pair = TradingPair(
|
||||
symbol=asset['symbol'],
|
||||
exchange=self.name,
|
||||
@@ -233,7 +243,10 @@ class Exchange:
|
||||
end_date=end_date,
|
||||
leverage=leverage,
|
||||
asset_name=asset_name,
|
||||
min_trade_size=min_trade_size
|
||||
min_trade_size=min_trade_size,
|
||||
end_daily=end_daily,
|
||||
end_minute=end_minute,
|
||||
exchange_symbol=exchange_symbol
|
||||
)
|
||||
|
||||
self.assets[exchange_symbol] = trading_pair
|
||||
|
||||
@@ -10,7 +10,8 @@ from catalyst.data.minute_bars import BcolzMinuteOverlappingData, \
|
||||
BcolzMinuteBarWriter, BcolzMinuteBarReader
|
||||
from catalyst.data.us_equity_pricing import BcolzDailyBarWriter, \
|
||||
BcolzDailyBarReader
|
||||
from catalyst.exchange.bundle_utils import fetch_candles_chunk
|
||||
from catalyst.exchange.bundle_utils import fetch_candles_chunk, get_history, \
|
||||
get_seconds_from_date
|
||||
from catalyst.exchange.exchange_utils import get_exchange_folder
|
||||
from catalyst.exchange.init_utils import get_exchange
|
||||
from catalyst.utils.cli import maybe_show_progress
|
||||
@@ -136,11 +137,12 @@ class ExchangeBundle:
|
||||
if len(os.listdir(output_dir)) > 0:
|
||||
self._writers[key] = BcolzDailyBarWriter.open(output_dir, end)
|
||||
else:
|
||||
end_session = end.floor('1d')
|
||||
self._writers[key] = BcolzDailyBarWriter(
|
||||
filename=output_dir,
|
||||
calendar=open_calendar,
|
||||
start_session=start,
|
||||
end_session=end
|
||||
end_session=end_session
|
||||
)
|
||||
else:
|
||||
raise ValueError(
|
||||
@@ -213,14 +215,26 @@ class ExchangeBundle:
|
||||
log.debug('the data chunk already exists')
|
||||
return
|
||||
|
||||
# TODO: ensure correct behavior for assets starting in the chunk
|
||||
candles = fetch_candles_chunk(
|
||||
exchange=self.exchange,
|
||||
assets=missing_assets,
|
||||
data_frequency=data_frequency,
|
||||
end_dt=chunk_end,
|
||||
bar_count=chunk['bar_count']
|
||||
)
|
||||
if data_frequency == 'minute':
|
||||
# TODO: ensure correct behavior for assets starting in the chunk
|
||||
candles = fetch_candles_chunk(
|
||||
exchange=self.exchange,
|
||||
assets=missing_assets,
|
||||
data_frequency=data_frequency,
|
||||
end_dt=chunk_end,
|
||||
bar_count=chunk['bar_count']
|
||||
)
|
||||
else:
|
||||
for asset in missing_assets:
|
||||
# TODO: switch to Catalyst symbol convention
|
||||
candles = get_history(
|
||||
exchange_name=self.exchange.name,
|
||||
data_frequency=data_frequency,
|
||||
symbol=asset.exchange_symbol,
|
||||
start_seconds=get_seconds_from_date(chunk_start),
|
||||
end_seconds=get_seconds_from_date(chunk_end)
|
||||
)
|
||||
pass
|
||||
|
||||
num_candles = 0
|
||||
data = []
|
||||
|
||||
@@ -8,7 +8,7 @@ log = Logger('test_exchange_bundle')
|
||||
|
||||
|
||||
class ExchangeBundleTestCase:
|
||||
def test_ingest(self):
|
||||
def test_ingest_minute(self):
|
||||
exchange_name = 'bitfinex'
|
||||
|
||||
start = pd.to_datetime('2017-09-01', utc=True)
|
||||
@@ -26,3 +26,22 @@ class ExchangeBundleTestCase:
|
||||
show_progress=True
|
||||
)
|
||||
pass
|
||||
|
||||
def test_ingest_daily(self):
|
||||
exchange_name = 'bitfinex'
|
||||
|
||||
start = pd.to_datetime('2017-09-01', utc=True)
|
||||
end = pd.Timestamp.utcnow()
|
||||
|
||||
exchange_bundle = ExchangeBundle(exchange_name)
|
||||
|
||||
log.info('ingesting exchange bundle {}'.format(exchange_name))
|
||||
exchange_bundle.ingest(
|
||||
data_frequency='daily',
|
||||
include_symbols='neo_btc',
|
||||
exclude_symbols=None,
|
||||
start=start,
|
||||
end=end,
|
||||
show_progress=True
|
||||
)
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user