diff --git a/catalyst/examples/simple_loop.py b/catalyst/examples/simple_loop.py index 46965b98..d9bf988e 100644 --- a/catalyst/examples/simple_loop.py +++ b/catalyst/examples/simple_loop.py @@ -20,7 +20,7 @@ def handle_data(context, data): context.asset, fields='price', bar_count=15, - frequency='1d' + frequency='1m' ) rsi = talib.RSI(prices.values, timeperiod=14)[-1] print('got rsi: {}'.format(rsi)) @@ -31,7 +31,7 @@ run_algorithm( capital_base=250, start=pd.to_datetime('2017-08-01', utc=True), end=pd.to_datetime('2017-9-30', utc=True), - data_frequency='daily', + data_frequency='minute', initialize=initialize, handle_data=handle_data, analyze=None, diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index 30579f5c..b86639aa 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -179,10 +179,7 @@ class ExchangeBundle: # This is workaround, there is an issue with empty # session_label when using a newly created writer - key = writer._rootdir if data_frequency == 'minute' \ - else writer._filename - - del self._writers[key] + del self._writers[writer._rootdir] writer = self.get_writer(writer._start_session, writer._end_session, data_frequency) @@ -295,6 +292,7 @@ class ExchangeBundle: if not df.empty: df.sort_index(inplace=True) data.append((asset.sid, df)) + self._write(data, writer, data_frequency) if cleanup: @@ -359,24 +357,26 @@ class ExchangeBundle: chunks = [] for asset in assets: try: - asset_start, asset_end = self.get_adj_dates( + # Checking if the the asset has price data in the specified + # date range + adj_start, adj_end = self.get_adj_dates( start_dt, end_dt, [asset], data_frequency ) except NoDataAvailableOnExchange: + # If not, we continue to the next asset continue + # This is either the first trading day of the asset or the + # first session available in the calendar + first_trading_dt = asset.start_date \ + if asset.start_date > self.calendar.first_session \ + else self.calendar.first_session + # Aligning start / end dates with the daily calendar - sessions = get_periods_range(start_dt, end_dt, data_frequency) \ - if data_frequency == 'minute' \ - else self.calendar.sessions_in_range(start_dt, end_dt) - - if asset_start < sessions[0]: - asset_start = sessions[0] - - if asset_end > sessions[-1]: - asset_end = sessions[-1] + sessions = self.calendar.sessions_in_range(adj_start, adj_end) + # We loop through each session to create chunks for each period chunk_labels = [] dt = sessions[0] while dt <= sessions[-1]: @@ -390,38 +390,49 @@ class ExchangeBundle: # of the trading pair if data_frequency == 'minute': period_start, period_end = get_month_start_end(dt) - asset_start_month, _ = get_month_start_end(asset_start) - if asset.start_date > period_start: + # TODO: redundant gate, we are already filtering dates + if first_trading_dt > period_start: dt += timedelta(days=1) continue + asset_start_month, _ = get_month_start_end( + first_trading_dt + ) if asset_start_month == period_start \ - and period_start < asset_start: - period_start = asset_start + and period_start < first_trading_dt: + period_start = first_trading_dt - # TODO: ensure to filter out closed currencies - _, asset_end_month = get_month_start_end(asset_end) + # TODO: need to filter closed pairs? + _, asset_end_month = get_month_start_end( + asset.end_minute + ) if asset_end_month == period_end \ - and period_end > asset_end: - period_end = asset_end + and period_end > asset.end_minute: + period_end = asset.end_minute elif data_frequency == 'daily': period_start, period_end = get_year_start_end(dt) - asset_start_year, _ = get_year_start_end(asset_start) - if asset.start_date > period_start: + # TODO: redundant gate, we are already filtering dates + if first_trading_dt > period_start: dt += timedelta(days=1) continue + asset_start_year, _ = get_year_start_end( + first_trading_dt + ) if asset_start_year == period_start \ - and period_start < asset_start: - period_start = asset_start + and period_start < first_trading_dt: + period_start = first_trading_dt - _, asset_end_year = get_year_start_end(asset_end) + _, asset_end_year = get_year_start_end( + asset.end_minute + ) if asset_end_year == period_end \ - and period_end > asset_end: - period_end = asset_end + and period_end > asset.end_minute: + period_end = asset.end_minute + else: raise InvalidHistoryFrequencyError( frequency=data_frequency @@ -431,10 +442,13 @@ class ExchangeBundle: # Checking the last minute of the day instead. range_start = period_start.replace(hour=23, minute=59) \ if data_frequency == 'minute' else period_start + + # Checking if the data already exists in the bundle + # for the date range of the chunk. If not, we create + # a chunk for ingestion. has_data = range_in_bundle( asset, range_start, period_end, reader ) - if not has_data: log.debug('adding period: {}'.format(label)) chunks.append( @@ -448,6 +462,7 @@ class ExchangeBundle: dt += timedelta(days=1) + # We sort the chunks by end date to ingest most recent data first chunks.sort(key=lambda chunk: chunk['period_end']) return chunks @@ -462,13 +477,24 @@ class ExchangeBundle: :param end_dt: :return: """ - writer = self.get_writer(start_dt, end_dt, data_frequency) chunks = self.prepare_chunks( assets=assets, data_frequency=data_frequency, start_dt=start_dt, end_dt=end_dt ) + + # Since chunks are either monthly or yearly, it is possible that + # our ingestion data range is greater than specified. We adjust + # the boundaries to ensure that the writer can write all data. + for chunk in chunks: + if chunk['period_start'] < start_dt: + start_dt = chunk['period_start'] + + if chunk['period_end'] > end_dt: + end_dt = chunk['period_end'] + + writer = self.get_writer(start_dt, end_dt, data_frequency) with maybe_show_progress( chunks, show_progress, @@ -485,7 +511,7 @@ class ExchangeBundle: end_dt=chunk['period_end'], writer=writer, empty_rows_behavior='strip', - cleanup=False + cleanup=True ) def ingest(self, data_frequency, include_symbols=None,