Fixed issues in the prepare_chunk logic

This commit is contained in:
fredfortier
2017-10-24 20:03:50 -04:00
parent 1cc34a1485
commit 1cfe3b1bb2
2 changed files with 60 additions and 34 deletions
+2 -2
View File
@@ -20,7 +20,7 @@ def handle_data(context, data):
context.asset,
fields='price',
bar_count=15,
frequency='1d'
frequency='1m'
)
rsi = talib.RSI(prices.values, timeperiod=14)[-1]
print('got rsi: {}'.format(rsi))
@@ -31,7 +31,7 @@ run_algorithm(
capital_base=250,
start=pd.to_datetime('2017-08-01', utc=True),
end=pd.to_datetime('2017-9-30', utc=True),
data_frequency='daily',
data_frequency='minute',
initialize=initialize,
handle_data=handle_data,
analyze=None,
+58 -32
View File
@@ -179,10 +179,7 @@ class ExchangeBundle:
# This is workaround, there is an issue with empty
# session_label when using a newly created writer
key = writer._rootdir if data_frequency == 'minute' \
else writer._filename
del self._writers[key]
del self._writers[writer._rootdir]
writer = self.get_writer(writer._start_session,
writer._end_session, data_frequency)
@@ -295,6 +292,7 @@ class ExchangeBundle:
if not df.empty:
df.sort_index(inplace=True)
data.append((asset.sid, df))
self._write(data, writer, data_frequency)
if cleanup:
@@ -359,24 +357,26 @@ class ExchangeBundle:
chunks = []
for asset in assets:
try:
asset_start, asset_end = self.get_adj_dates(
# Checking if the the asset has price data in the specified
# date range
adj_start, adj_end = self.get_adj_dates(
start_dt, end_dt, [asset], data_frequency
)
except NoDataAvailableOnExchange:
# If not, we continue to the next asset
continue
# This is either the first trading day of the asset or the
# first session available in the calendar
first_trading_dt = asset.start_date \
if asset.start_date > self.calendar.first_session \
else self.calendar.first_session
# Aligning start / end dates with the daily calendar
sessions = get_periods_range(start_dt, end_dt, data_frequency) \
if data_frequency == 'minute' \
else self.calendar.sessions_in_range(start_dt, end_dt)
if asset_start < sessions[0]:
asset_start = sessions[0]
if asset_end > sessions[-1]:
asset_end = sessions[-1]
sessions = self.calendar.sessions_in_range(adj_start, adj_end)
# We loop through each session to create chunks for each period
chunk_labels = []
dt = sessions[0]
while dt <= sessions[-1]:
@@ -390,38 +390,49 @@ class ExchangeBundle:
# of the trading pair
if data_frequency == 'minute':
period_start, period_end = get_month_start_end(dt)
asset_start_month, _ = get_month_start_end(asset_start)
if asset.start_date > period_start:
# TODO: redundant gate, we are already filtering dates
if first_trading_dt > period_start:
dt += timedelta(days=1)
continue
asset_start_month, _ = get_month_start_end(
first_trading_dt
)
if asset_start_month == period_start \
and period_start < asset_start:
period_start = asset_start
and period_start < first_trading_dt:
period_start = first_trading_dt
# TODO: ensure to filter out closed currencies
_, asset_end_month = get_month_start_end(asset_end)
# TODO: need to filter closed pairs?
_, asset_end_month = get_month_start_end(
asset.end_minute
)
if asset_end_month == period_end \
and period_end > asset_end:
period_end = asset_end
and period_end > asset.end_minute:
period_end = asset.end_minute
elif data_frequency == 'daily':
period_start, period_end = get_year_start_end(dt)
asset_start_year, _ = get_year_start_end(asset_start)
if asset.start_date > period_start:
# TODO: redundant gate, we are already filtering dates
if first_trading_dt > period_start:
dt += timedelta(days=1)
continue
asset_start_year, _ = get_year_start_end(
first_trading_dt
)
if asset_start_year == period_start \
and period_start < asset_start:
period_start = asset_start
and period_start < first_trading_dt:
period_start = first_trading_dt
_, asset_end_year = get_year_start_end(asset_end)
_, asset_end_year = get_year_start_end(
asset.end_minute
)
if asset_end_year == period_end \
and period_end > asset_end:
period_end = asset_end
and period_end > asset.end_minute:
period_end = asset.end_minute
else:
raise InvalidHistoryFrequencyError(
frequency=data_frequency
@@ -431,10 +442,13 @@ class ExchangeBundle:
# Checking the last minute of the day instead.
range_start = period_start.replace(hour=23, minute=59) \
if data_frequency == 'minute' else period_start
# Checking if the data already exists in the bundle
# for the date range of the chunk. If not, we create
# a chunk for ingestion.
has_data = range_in_bundle(
asset, range_start, period_end, reader
)
if not has_data:
log.debug('adding period: {}'.format(label))
chunks.append(
@@ -448,6 +462,7 @@ class ExchangeBundle:
dt += timedelta(days=1)
# We sort the chunks by end date to ingest most recent data first
chunks.sort(key=lambda chunk: chunk['period_end'])
return chunks
@@ -462,13 +477,24 @@ class ExchangeBundle:
:param end_dt:
:return:
"""
writer = self.get_writer(start_dt, end_dt, data_frequency)
chunks = self.prepare_chunks(
assets=assets,
data_frequency=data_frequency,
start_dt=start_dt,
end_dt=end_dt
)
# Since chunks are either monthly or yearly, it is possible that
# our ingestion data range is greater than specified. We adjust
# the boundaries to ensure that the writer can write all data.
for chunk in chunks:
if chunk['period_start'] < start_dt:
start_dt = chunk['period_start']
if chunk['period_end'] > end_dt:
end_dt = chunk['period_end']
writer = self.get_writer(start_dt, end_dt, data_frequency)
with maybe_show_progress(
chunks,
show_progress,
@@ -485,7 +511,7 @@ class ExchangeBundle:
end_dt=chunk['period_end'],
writer=writer,
empty_rows_behavior='strip',
cleanup=False
cleanup=True
)
def ingest(self, data_frequency, include_symbols=None,