diff --git a/catalyst/__main__.py b/catalyst/__main__.py index 7a9e6316..bdd9e537 100644 --- a/catalyst/__main__.py +++ b/catalyst/__main__.py @@ -126,7 +126,7 @@ def ipython_only(option): ) @click.option( '--data-frequency', - type=click.Choice({'daily', '5-minute', 'minute'}), + type=click.Choice({'daily', 'minute'}), default='daily', show_default=True, help='The data frequency of the simulation.', @@ -196,8 +196,8 @@ def ipython_only(option): @click.option( '-x', '--exchange-name', - type=click.Choice({'bitfinex', 'bittrex'}), - help='The name of the targeted exchange (supported: bitfinex, bittrex).', + type=click.Choice({'bitfinex', 'bittrex', 'poloniex'}), + help='The name of the targeted exchange (supported: bitfinex, bittrex, poloniex).', ) @click.option( '-n', diff --git a/catalyst/algorithm.py b/catalyst/algorithm.py index 51fdfdba..65e2fa60 100644 --- a/catalyst/algorithm.py +++ b/catalyst/algorithm.py @@ -134,10 +134,7 @@ from catalyst.utils.security_list import SecurityList import catalyst.protocol from catalyst.sources.requests_csv import PandasRequestsCSV -from catalyst.gens.sim_engine import ( - MinuteSimulationClock, - FiveMinuteSimulationClock, -) +from catalyst.gens.sim_engine import MinuteSimulationClock from catalyst.sources.benchmark_source import BenchmarkSource from catalyst.catalyst_warnings import ZiplineDeprecationWarning @@ -174,7 +171,7 @@ class TradingAlgorithm(object): algo_filename : str, optional The filename for the algoscript. This will be used in exception tracebacks. default: ''. - data_frequency : {'daily', '5-minute', 'minute'}, optional + data_frequency : {'daily', 'minute'}, optional The duration of the bars. instant_fill : bool, optional Whether to fill orders immediately or on next bar. default: False @@ -227,7 +224,7 @@ class TradingAlgorithm(object): script : str Algoscript that contains initialize and handle_data function definition. - data_frequency : {'daily', '5-minute', 'minute'} + data_frequency : {'daily', 'minute'} The duration of the bars. capital_base : float How much capital to start with. @@ -435,8 +432,6 @@ class TradingAlgorithm(object): if get_loader is not None: if data_frequency == 'daily': all_dates = self.trading_calendar.all_sessions - elif data_frequency == '5-minute': - all_dates = self.trading_calendar.all_five_minutes elif data_frequency == 'minute': all_dates = self.trading_calendar.all_minutes else: @@ -468,7 +463,7 @@ class TradingAlgorithm(object): self._in_before_trading_start = True with handle_non_market_minutes(data) if \ - self.data_frequency in ('minute', '5-minute') else ExitStack(): + self.data_frequency == 'minute' else ExitStack(): self._before_trading_start(self, data) self._in_before_trading_start = False @@ -524,11 +519,10 @@ class TradingAlgorithm(object): market_closes = trading_o_and_c['market_close'] minutely_emission = False - if self.sim_params.data_frequency in set(('minute', '5-minute')): + if self.sim_params.data_frequency == 'minute': market_opens = trading_o_and_c['market_open'] - minutely_emission = self.sim_params.emission_rate in \ - set(('minute', '5-minute')) + minutely_emission = self.sim_params.emission_rate == 'minute' else: # in daily mode, we want to have one bar per session, timestamped # as the last minute of the session. @@ -552,15 +546,6 @@ class TradingAlgorithm(object): 'UTC', ) - if self.sim_params.data_frequency == '5-minute': - return FiveMinuteSimulationClock( - self.sim_params.sessions, - execution_opens, - execution_closes, - before_trading_start_minutes, - minute_emission=minutely_emission, - ) - return MinuteSimulationClock( self.sim_params.sessions, execution_opens, @@ -692,8 +677,6 @@ class TradingAlgorithm(object): time_count = times.nunique() if time_count == 1: self.sim_params.data_frequency = 'daily' - elif time_count == 288: - self.sim_params.data_frequency = '5-minute' else: self.sim_params.data_frequency = 'minute' @@ -715,8 +698,6 @@ class TradingAlgorithm(object): if self.sim_params.data_frequency == 'daily': equity_reader_arg = 'equity_daily_reader' - elif self.sim_params.data_frequency == '5-minute': - equity_daily_reader = 'equity_5_minute_reader' elif self.sim_params.data_frequency == 'minute': equity_reader_arg = 'equity_minute_reader' equity_reader = PanelBarReader( @@ -960,9 +941,9 @@ class TradingAlgorithm(object): The arena from the simulation parameters. This will normally be ``'backtest'`` but some systems may use this distinguish live trading from backtesting. - data_frequency : {'daily', '5-minute', 'minute'} + data_frequency : {'daily', 'minute'} data_frequency tells the algorithm if it is running with - daily, minute, or five-minute mode. + daily or minute mode. start : datetime The start date for the simulation. end : datetime @@ -1137,18 +1118,11 @@ class TradingAlgorithm(object): 'time_rule= when calling schedule_function without ' 'specifying a date_rule', stacklevel=3) - freq = self.sim_params.data_frequency - date_rule = date_rule or date_rules.every_day() - if freq is 'daily': - # ignore time rule in daily mode - time_rule = time_rules.every_minute() - else: - # use provided time rule or default to every minute or 5 minutes - # based on desired data frequency. - time_rule = time_rule or (time_rules.every_5_minutes() - if freq is '5-minute' else - time_rules.every_minute()) + time_rule = ((time_rule or time_rules.every_minute()) + if self.sim_params.data_frequency == 'minute' else + # If we are in daily mode the time_rule is ignored. + time_rules.every_minute()) # Check the type of the algorithm's schedule before pulling calendar # Note that the ExchangeTradingSchedule is currently the only @@ -1819,7 +1793,7 @@ class TradingAlgorithm(object): @data_frequency.setter def data_frequency(self, value): - assert value in ('daily', '5-minute', 'minute') + assert value in ('daily', 'minute') self.sim_params.data_frequency = value @api_method diff --git a/catalyst/data/_minute_bar_internal.pyx b/catalyst/data/_minute_bar_internal.pyx index 9ebb0841..bea12bc2 100644 --- a/catalyst/data/_minute_bar_internal.pyx +++ b/catalyst/data/_minute_bar_internal.pyx @@ -35,17 +35,6 @@ def minute_value(ndarray[long_t, ndim=1] market_opens, return market_opens[q] + r -@cython.cdivision(True) -def five_minute_value(ndarray[long_t, ndim=1] market_opens, - Py_ssize_t pos, - short five_minutes_per_day): - - cdef short q, r - q = cython.cdiv(pos, five_minutes_per_day) - r = cython.cmod(pos, five_minutes_per_day) - - return market_opens[q] + r - def find_position_of_minute(ndarray[long_t, ndim=1] market_opens, ndarray[long_t, ndim=1] market_closes, long_t minute_val, @@ -99,26 +88,6 @@ def find_position_of_minute(ndarray[long_t, ndim=1] market_opens, return (market_open_loc * minutes_per_day) + delta -def find_position_of_five_minute(ndarray[long_t, ndim=1] market_opens, - ndarray[long_t, ndim=1] market_closes, - long_t five_minute_val, - short five_minutes_per_day, - bool forward_fill): - - cdef Py_ssize_t market_open_loc, market_open, delta - - market_open_loc = \ - searchsorted(market_opens, five_minute_val, side='right') - 1 - market_open = market_opens[market_open_loc] - market_close = market_closes[market_open_loc] - - if not forward_fill and ((five_minute_val - market_open) >= five_minutes_per_day): - raise ValueError("Given five minutes is not between an open and a close") - - delta = int_min(five_minute_val - market_open, market_close - market_open) - - return (market_open_loc * five_minutes_per_day) + delta - def find_last_traded_position_internal( ndarray[long_t, ndim=1] market_opens, ndarray[long_t, ndim=1] market_closes, @@ -189,50 +158,3 @@ def find_last_traded_position_internal( # found a trade event return -1 -def find_last_traded_five_minute_position_internal( - ndarray[long_t, ndim=1] market_opens, - ndarray[long_t, ndim=1] market_closes, - long_t end_five_minute, - long_t start_five_minute, - volumes, - short five_minutes_per_day): - cdef Py_ssize_t minute_pos, current_minute, q - - five_minute_pos = int_min( - find_position_of_five_minute( - market_opens, - market_closes, - end_five_minute, - five_minutes_per_day, - True, - ), - len(volumes) - 1, - ) - - while five_minute_pos >= 0: - current_five_minute = five_minute_value( - market_opens, five_minute_pos, five_minutes_per_day - ) - - q = cython.cdiv(five_minute_pos, five_minutes_per_day) - if current_five_minute > market_closes[q]: - five_minute_pos = find_position_of_five_minute( - market_opens, - market_closes, - market_closes[q], - five_minutes_per_day, - False, - ) - continue - - if current_five_minute < start_five_minute: - return -1 - - if volumes[five_minute_pos] != 0: - return five_minute_pos - - five_minute_pos -= 1 - - # we've gone to the beginning of this asset's range, and still haven't - # found a trade event - return -1 diff --git a/catalyst/data/bundles/base.py b/catalyst/data/bundles/base.py index 135dd531..6af7a0eb 100644 --- a/catalyst/data/bundles/base.py +++ b/catalyst/data/bundles/base.py @@ -60,10 +60,6 @@ class BaseBundle(object): def minutes_per_day(self): raise NotImplementedError() - @lazyval - def five_minutes_per_day(self): - raise NotImplementedError() - @lazyval def frequencies(self): raise NotImplementedError() @@ -115,7 +111,6 @@ class BaseBundle(object): environ, asset_db_writer, minute_bar_writer, - five_minute_bar_writer, daily_bar_writer, adjustment_writer, calendar, @@ -162,7 +157,7 @@ class BaseBundle(object): # Post-process metadata using cached symbol frames, and write to # disk. This metadata must be written before any attempt to write - # either minute or 5-minute data. + # minute data. metadata = self._post_process_metadata( raw_metadata, cache, @@ -170,26 +165,6 @@ class BaseBundle(object): ) asset_db_writer.write(metadata) - # Compile 5-minute symbol data if bundle supports 5-minute mode and - # persist the dataset to disk. - ''' - if '5-minute' in self.frequencies: - five_minute_bar_writer.write( - self._fetch_symbol_iter( - api_key, - cache, - symbol_map, - calendar, - start_session, - end_session, - '5-minute', - retries, - ), - length=len(symbol_map), - show_progress=show_progress, - ) - ''' - # Compile minute symbol data if bundle supports minute mode and # persist the dataset to disk. if 'minute' in self.frequencies: diff --git a/catalyst/data/bundles/base_pricing.py b/catalyst/data/bundles/base_pricing.py index c5281fdd..7b94e4bc 100644 --- a/catalyst/data/bundles/base_pricing.py +++ b/catalyst/data/bundles/base_pricing.py @@ -47,10 +47,6 @@ class BaseCryptoPricingBundle(BasePricingBundle): def minutes_per_day(self): return 1440 - @lazyval - def five_minutes_per_day(self): - return 288 - @property def splits(self): return [] @@ -68,10 +64,6 @@ class BaseEquityPricingBundle(BasePricingBundle): def minutes_per_day(self): return 390 - @lazyval - def five_minutes_per_day(self): - return 78 - @property def splits(self): return self._splits diff --git a/catalyst/data/bundles/core.py b/catalyst/data/bundles/core.py index 29aceb7a..a25591de 100644 --- a/catalyst/data/bundles/core.py +++ b/catalyst/data/bundles/core.py @@ -17,10 +17,6 @@ from ..us_equity_pricing import ( SQLiteAdjustmentReader, SQLiteAdjustmentWriter, ) -from ..five_minute_bars import ( - BcolzFiveMinuteBarReader, - BcolzFiveMinuteBarWriter, -) from ..minute_bars import ( BcolzMinuteBarReader, BcolzMinuteBarWriter, @@ -54,11 +50,6 @@ def minute_path(bundle_name, timestr, environ=None): environ=environ, ) -def five_minute_path(bundle_name, timestr, environ=None): - return pth.data_path( - five_minute_relative(bundle_name, timestr, environ), - environ=environ, - ) def daily_path(bundle_name, timestr, environ=None): return pth.data_path( @@ -92,8 +83,6 @@ def cache_relative(bundle_name, timestr, environ=None): def daily_relative(bundle_name, timestr, environ=None): return bundle_name, timestr, 'daily_equities.bcolz' -def five_minute_relative(bundle_name, timestr, environ=None): - return bundle_name, timestr, 'five_minute.bcolz' def minute_relative(bundle_name, timestr, environ=None): return bundle_name, timestr, 'minute_equities.bcolz' @@ -206,14 +195,13 @@ RegisteredBundle = namedtuple( 'start_session', 'end_session', 'minutes_per_day', - 'five_minutes_per_day', 'ingest', 'create_writers'] ) BundleData = namedtuple( 'BundleData', - 'asset_finder minute_bar_reader five_minute_bar_reader daily_bar_reader ' + 'asset_finder minute_bar_reader daily_bar_reader ' 'adjustment_reader', ) @@ -303,7 +291,6 @@ def _make_bundle_core(): bundle.ingest, calendar_name=bundle.calendar_name, minutes_per_day=bundle.minutes_per_day, - five_minutes_per_day=bundle.five_minutes_per_day, start_session=start_session, end_session=end_session, create_writers=create_writers, @@ -316,7 +303,6 @@ def _make_bundle_core(): start_session=None, end_session=None, minutes_per_day=1440, - five_minutes_per_day=288, create_writers=True): """Register a data bundle ingest function. @@ -397,7 +383,6 @@ def _make_bundle_core(): start_session=start_session, end_session=end_session, minutes_per_day=minutes_per_day, - five_minutes_per_day=five_minutes_per_day, ingest=f, create_writers=create_writers, ) @@ -496,16 +481,6 @@ def _make_bundle_core(): # that it can compute the adjustment ratios for the dividends. daily_bar_writer.write(()) - five_minute_bar_writer = BcolzFiveMinuteBarWriter( - wd.ensure_dir(*five_minute_relative( - name, timestr, environ=environ) - ), - calendar, - start_session, - end_session, - five_minutes_per_day=bundle.five_minutes_per_day, - ) - minute_bar_writer = BcolzMinuteBarWriter( wd.ensure_dir(*minute_relative( name, timestr, environ=environ) @@ -532,7 +507,6 @@ def _make_bundle_core(): ) else: daily_bar_writer = None - five_minute_bar_writer = None minute_bar_writer = None asset_db_writer = None adjustment_db_writer = None @@ -544,7 +518,6 @@ def _make_bundle_core(): environ, asset_db_writer, minute_bar_writer, - five_minute_bar_writer, daily_bar_writer, adjustment_db_writer, calendar, @@ -631,9 +604,6 @@ def _make_bundle_core(): minute_bar_reader=BcolzMinuteBarReader( minute_path(name, timestr, environ=environ), ), - five_minute_bar_reader=BcolzFiveMinuteBarReader( - five_minute_path(name, timestr, environ=environ), - ), daily_bar_reader=BcolzDailyBarReader( daily_path(name, timestr, environ=environ), ), diff --git a/catalyst/data/bundles/poloniex.py b/catalyst/data/bundles/poloniex.py index e161df95..64cc2d27 100644 --- a/catalyst/data/bundles/poloniex.py +++ b/catalyst/data/bundles/poloniex.py @@ -148,7 +148,6 @@ class PoloniexBundle(BaseCryptoPricingBundle): data_frequency): period_map = { 'daily': 86400, -# '5-minute': 300, } try: diff --git a/catalyst/data/data_portal.py b/catalyst/data/data_portal.py index 69aa166e..6bfe6047 100644 --- a/catalyst/data/data_portal.py +++ b/catalyst/data/data_portal.py @@ -42,7 +42,6 @@ from catalyst.assets.roll_finder import ( ) from catalyst.data.dispatch_bar_reader import ( AssetDispatchMinuteBarReader, - AssetDispatchFiveMinuteBarReader, AssetDispatchSessionBarReader ) from catalyst.data.resample import ( @@ -120,10 +119,6 @@ class DataPortal(object): daily data backtests or daily history calls in a minute backetest. If a daily bar reader is not provided but a minute bar reader is, the minutes will be rolled up to serve the daily requests. - five_minute_reader : BcolzFiveMinuteBarReader, optional - The five minute bar reader for equities. This will be used to service - 5-minute data backtests or five-minute history calls. This can be used - to serve daily calls if no daily bar reader is provided. minute_reader : BcolzMinuteBarReader, optional The minute bar reader for equities. This will be used to service minute data backtests or minute history calls. This can be used @@ -150,7 +145,6 @@ class DataPortal(object): trading_calendar, first_trading_day, daily_reader=None, - five_minute_reader=None, minute_reader=None, future_daily_reader=None, future_minute_reader=None, @@ -202,7 +196,6 @@ class DataPortal(object): reader.last_available_dt for reader in [ minute_reader, - five_minute_reader, future_minute_reader, ] if reader is not None @@ -214,8 +207,6 @@ class DataPortal(object): aligned_minute_reader = self._ensure_reader_aligned( minute_reader) - aligned_five_minute_reader = self._ensure_reader_aligned( - five_minute_reader) aligned_session_reader = self._ensure_reader_aligned( daily_reader) aligned_future_minute_reader = self._ensure_reader_aligned( @@ -229,13 +220,10 @@ class DataPortal(object): } aligned_minute_readers = {} - aligned_five_minute_readers = {} aligned_session_readers = {} if aligned_minute_reader is not None: aligned_minute_readers[Equity] = aligned_minute_reader - if aligned_five_minute_reader is not None: - aligned_five_minute_readers[Equity] = aligned_five_minute_reader if aligned_session_reader is not None: aligned_session_readers[Equity] = aligned_session_reader @@ -267,13 +255,6 @@ class DataPortal(object): self._last_available_minute, ) - _dispatch_five_minute_reader = AssetDispatchFiveMinuteBarReader( - self.trading_calendar, - self.asset_finder, - aligned_five_minute_readers, - self._last_available_minute, - ) - _dispatch_session_reader = AssetDispatchSessionBarReader( self.trading_calendar, self.asset_finder, @@ -283,7 +264,6 @@ class DataPortal(object): self._pricing_readers = { 'minute': _dispatch_minute_reader, - '5-minute': _dispatch_five_minute_reader, 'daily': _dispatch_session_reader, } @@ -719,17 +699,6 @@ class DataPortal(object): spot_value=result ) - - def _get_five_minute_spot_value(self, asset, column, dt, ffill=False): - return self._get_minutely_spot_value( - asset, - column, - dt, - ffill, - '5-minute', - ) - - def _get_minute_spot_value(self, asset, column, dt, ffill=False): return self._get_minutely_spot_value( asset, diff --git a/catalyst/data/dispatch_bar_reader.py b/catalyst/data/dispatch_bar_reader.py index e545eef0..e72dab7b 100644 --- a/catalyst/data/dispatch_bar_reader.py +++ b/catalyst/data/dispatch_bar_reader.py @@ -135,12 +135,6 @@ class AssetDispatchMinuteBarReader(AssetDispatchBarReader): def _dt_window_size(self, start_dt, end_dt): return len(self.trading_calendar.minutes_in_range(start_dt, end_dt)) - -class AssetDispatchFiveMinuteBarReader(AssetDispatchBarReader): - - def _dt_window_size(self, start_dt, end_dt): - return len(self.trading_calendar.five_minutes_in_range(start_dt, end_dt)) - class AssetDispatchSessionBarReader(AssetDispatchBarReader): def _dt_window_size(self, start_dt, end_dt): diff --git a/catalyst/data/five_minute_bars.py b/catalyst/data/five_minute_bars.py deleted file mode 100644 index 9021dc0a..00000000 --- a/catalyst/data/five_minute_bars.py +++ /dev/null @@ -1,1385 +0,0 @@ -# Copyright 2016 Quantopian, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from abc import ABCMeta, abstractmethod -import json -import os -from glob import glob -from os.path import join -from textwrap import dedent - -from lru import LRU -import bcolz -from bcolz import ctable -from intervaltree import IntervalTree -import logbook -import numpy as np -from numpy import ( - iinfo, - uint64, -) -import pandas as pd -from pandas import HDFStore -import tables -from six import with_metaclass -from toolz import keymap, valmap - -from catalyst.data._minute_bar_internal import ( - five_minute_value, - find_position_of_five_minute, - find_last_traded_five_minute_position_internal, -) - -from catalyst.gens.sim_engine import NANOS_IN_MINUTE - -from catalyst.data.bar_reader import BarReader, NoDataOnDate -from catalyst.data.us_equity_pricing import ( - winsorise_uint64, - check_uint64_safe, -) -from catalyst.utils.calendars import get_calendar -from catalyst.utils.cli import ( - item_show_count, - maybe_show_progress, -) -from catalyst.utils.memoize import lazyval - -logger = logbook.Logger('FiveMinuteBars') - -OPEN_FIVE_MINUTES_PER_DAY = 288 - -DEFAULT_EXPECTEDLEN_CRYPTO = OPEN_FIVE_MINUTES_PER_DAY * 366 * 15 - -OHLC_RATIO = 1000000 - -OHLC = frozenset(['open', 'high', 'low', 'close']) -OHLCV = frozenset(['open', 'high', 'low', 'close', 'volume']) - -UINT64_MAX = iinfo(uint64).max - -NANOS_IN_FIVE_MINUTES = 5 * NANOS_IN_MINUTE - -class BcolzFiveMinuteOverlappingData(Exception): - pass - - -class BcolzFiveMinuteWriterColumnMismatch(Exception): - pass - - -class FiveMinuteBarReader(BarReader): - @property - def data_frequency(self): - return "5-minute" - - -def _calc_five_minute_index(market_opens, five_minutes_per_day): - five_minutes = np.zeros(len(market_opens) * five_minutes_per_day, - dtype='datetime64[ns]') - deltas = 5 * np.arange(0, five_minutes_per_day, dtype='timedelta64[m]') - for i, market_open in enumerate(market_opens): - start = market_open.asm8 - five_minute_values = start + deltas - start_ix = five_minutes_per_day * i - end_ix = start_ix + five_minutes_per_day - five_minutes[start_ix:end_ix] = five_minute_values - return pd.to_datetime(five_minutes, utc=True, box=True) - - -def _sid_subdir_path(sid): - """ - Format subdir path to limit the number directories in any given - subdirectory to 100. - - The number in each directory is designed to support at least 100000 - equities. - - Parameters - ---------- - sid : int - Asset identifier. - - Returns - ------- - out : string - A path for the bcolz rootdir, including subdirectory prefixes based on - the padded string representation of the given sid. - - e.g. 1 is formatted as 00/00/000001.bcolz - """ - padded_sid = format(sid, '06') - return os.path.join( - # subdir 1 00/XX - padded_sid[0:2], - # subdir 2 XX/00 - padded_sid[2:4], - "{0}.bcolz".format(str(padded_sid)) - ) - - -def convert_cols(cols, scale_factor, sid, invalid_data_behavior): - """Adapt OHLCV columns into uint64 columns. - - Parameters - ---------- - cols : dict - A dict mapping each column name (open, high, low, close, volume) - to a float column to convert to uint64. - scale_factor : int - Factor to use to scale float values before converting to uint64. - sid : int - Sid of the relevant asset, for logging. - invalid_data_behavior : str - Specifies behavior when data cannot be converted to uint64. - If 'raise', raises an exception. - If 'warn', logs a warning and filters out incompatible values. - If 'ignore', silently filters out incompatible values. - """ - scaled_opens = np.nan_to_num(cols['open']) * scale_factor - scaled_highs = np.nan_to_num(cols['high']) * scale_factor - scaled_lows = np.nan_to_num(cols['low']) * scale_factor - scaled_closes = np.nan_to_num(cols['close']) * scale_factor - volumes = np.nan_to_num(cols['volume']) - - exclude_mask = np.zeros_like(scaled_opens, dtype=bool) - - for col_name, scaled_col in [ - ('open', scaled_opens), - ('high', scaled_highs), - ('low', scaled_lows), - ('close', scaled_closes), - ('volume', volumes), - ]: - max_val = scaled_col.max() - - try: - check_uint64_safe(max_val, col_name) - except ValueError: - if invalid_data_behavior == 'raise': - raise - - if invalid_data_behavior == 'warn': - logger.warn( - 'Values for sid={}, col={} contain some too large for ' - 'uint64 (max={}), filtering them out', - sid, col_name, max_val, - ) - - # We want to exclude all rows that have an unsafe value in - # this column. - exclude_mask &= (scaled_col >= iinfo(uint64).max) - - # Convert all cols to uint64. - opens = scaled_opens.astype(uint64) - highs = scaled_highs.astype(uint64) - lows = scaled_lows.astype(uint64) - closes = scaled_closes.astype(uint64) - volumes = volumes.astype(uint64) - - # Exclude rows with unsafe values by setting to zero. - opens[exclude_mask] = 0 - highs[exclude_mask] = 0 - lows[exclude_mask] = 0 - closes[exclude_mask] = 0 - volumes[exclude_mask] = 0 - - return opens, highs, lows, closes, volumes - - -class BcolzFiveMinuteBarMetadata(object): - """ - Parameters - ---------- - ohlc_ratio : int - The factor by which the pricing data is multiplied so that the - float data can be stored as an integer. - calendar : catalyst.utils.calendars.trading_calendar.TradingCalendar - The TradingCalendar on which the five minute bars are based. - start_session : datetime - The first trading session in the data set. - end_session : datetime - The last trading session in the data set. - five_minutes_per_day : int - The number of minutes per each period. - """ - FORMAT_VERSION = 3 - - METADATA_FILENAME = 'five-minute-metadata.json' - - @classmethod - def metadata_path(cls, rootdir): - return os.path.join(rootdir, cls.METADATA_FILENAME) - - @classmethod - def read(cls, rootdir): - path = cls.metadata_path(rootdir) - with open(path) as fp: - raw_data = json.load(fp) - - try: - version = raw_data['version'] - except KeyError: - # Version was first written with version 1, assume 0, - # if version does not match. - version = 0 - - default_ohlc_ratio = raw_data['ohlc_ratio'] - - if version >= 1: - five_minutes_per_day = raw_data['five_minutes_per_day'] - else: - # version 0 always assumed crypto assets. - five_minutes_per_day = CRYPTO_ASSETS_FIVE_MINUTES_PER_DAY - - if version >= 2: - calendar = get_calendar(raw_data['calendar_name']) - start_session = pd.Timestamp( - raw_data['start_session'], tz='UTC') - end_session = pd.Timestamp(raw_data['end_session'], tz='UTC') - else: - # No calendar info included in older versions, so - # default to OPEN. - calendar = get_calendar('OPEN') - - start_session = pd.Timestamp( - raw_data['first_trading_day'], tz='UTC') - end_session = calendar.minute_to_session_label( - pd.Timestamp( - raw_data['market_closes'][-1], unit='m', tz='UTC') - ) - - if version >= 3: - ohlc_ratios_per_sid = raw_data['ohlc_ratios_per_sid'] - if ohlc_ratios_per_sid is not None: - ohlc_ratios_per_sid = keymap(int, ohlc_ratios_per_sid) - else: - ohlc_ratios_per_sid = None - - return cls( - default_ohlc_ratio, - ohlc_ratios_per_sid, - calendar, - start_session, - end_session, - five_minutes_per_day, - version=version, - ) - - def __init__( - self, - default_ohlc_ratio, - ohlc_ratios_per_sid, - calendar, - start_session, - end_session, - five_minutes_per_day, - version=FORMAT_VERSION, - ): - self.calendar = calendar - self.start_session = start_session - self.end_session = end_session - self.default_ohlc_ratio = default_ohlc_ratio - self.ohlc_ratios_per_sid = ohlc_ratios_per_sid - self.five_minutes_per_day = five_minutes_per_day - self.version = version - - def write(self, rootdir): - """ - Write the metadata to a JSON file in the rootdir. - - Values contained in the metadata are: - - version : int - The value of FORMAT_VERSION of this class. - ohlc_ratio : int - The default ratio by which to multiply the pricing data to - convert the floats from floats to an integer to fit within - the np.uint64. If ohlc_ratios_per_sid is None or does not - contain a mapping for a given sid, this ratio is used. - ohlc_ratios_per_sid : dict - A dict mapping each sid in the output to the factor by - which the pricing data is multiplied so that the float data - can be stored as an integer. - five_minutes_per_day : int - The number of minutes per each period. - calendar_name : str - The name of the TradingCalendar on which the minute bars are - based. - start_session : datetime - 'YYYY-MM-DD' formatted representation of the first trading - session in the data set. - end_session : datetime - 'YYYY-MM-DD' formatted representation of the last trading - session in the data set. - - Deprecated, but included for backwards compatibility: - - first_trading_day : string - 'YYYY-MM-DD' formatted representation of the first trading day - available in the dataset. - market_opens : list - List of int64 values representing UTC market opens as - minutes since epoch. - market_closes : list - List of int64 values representing UTC market closes as - minutes since epoch. - """ - - calendar = self.calendar - slicer = calendar.schedule.index.slice_indexer( - self.start_session, - self.end_session, - ) - schedule = calendar.schedule[slicer] - market_opens = schedule.market_open - market_closes = schedule.market_close - - metadata = { - 'version': self.version, - 'ohlc_ratio': self.default_ohlc_ratio, - 'ohlc_ratios_per_sid': self.ohlc_ratios_per_sid, - 'five_minutes_per_day': self.five_minutes_per_day, - 'calendar_name': self.calendar.name, - 'start_session': str(self.start_session.date()), - 'end_session': str(self.end_session.date()), - # Write these values for backwards compatibility - 'first_trading_day': str(self.start_session.date()), - 'market_opens': ( - market_opens.values.astype('datetime64[m]'). - astype(np.int64).tolist()), - 'market_closes': ( - market_closes.values.astype('datetime64[m]'). - astype(np.int64).tolist()), - } - with open(self.metadata_path(rootdir), 'w+') as fp: - json.dump(metadata, fp) - - -class BcolzFiveMinuteBarWriter(object): - """ - Class capable of writing minute OHLCV data to disk into bcolz format. - - Parameters - ---------- - rootdir : string - Path to the root directory into which to write the metadata and - bcolz subdirectories. - calendar : catalyst.utils.calendars.trading_calendar.TradingCalendar - The trading calendar on which to base the minute bars. Used to - get the market opens used as a starting point for each periodic - span of minutes in the index, and the market closes that - correspond with the market opens. - five_minutes_per_day : int - The number of minutes per each period. Defaults to 390, the mode - of minutes in NYSE trading days. - start_session : datetime - The first trading session in the data set. - end_session : datetime - The last trading session in the data set. - default_ohlc_ratio : int, optional - The default ratio by which to multiply the pricing data to - convert from floats to integers that fit within np.uint64. If - ohlc_ratios_per_sid is None or does not contain a mapping for a - given sid, this ratio is used. Default is OHLC_RATIO (1000). - ohlc_ratios_per_sid : dict, optional - A dict mapping each sid in the output to the ratio by which to - multiply the pricing data to convert the floats from floats to - an integer to fit within the np.uint64. - expectedlen : int, optional - The expected length of the dataset, used when creating the initial - bcolz ctable. - - If the expectedlen is not used, the chunksize and corresponding - compression ratios are not ideal. - - Defaults to supporting 15 years of NYSE equity market data. - see: http://bcolz.blosc.org/opt-tips.html#informing-about-the-length-of-your-carrays # noqa - write_metadata : bool, optional - If True, writes the minute bar metadata (on init of the writer). - If False, no metadata is written (existing metadata is - retained). Default is True. - - Notes - ----- - Writes a bcolz directory for each individual sid, all contained within - a root directory which also contains metadata about the entire dataset. - - Each individual asset's data is stored as a bcolz table with a column for - each pricing field: (open, high, low, close, volume) - - The open, high, low, and close columns are integers which are 1000 times - the quoted price, so that the data can represented and stored as an - np.uint64, supporting market prices quoted up to the thousands place. - - volume is a np.uint64 with no mutation of the tens place. - - The 'index' for each individual asset are a repeating period of minutes of - length `minutes_per_day` starting from each market open. - The file format does not account for half-days. - e.g.: - 2016-01-19 14:31 - 2016-01-19 14:32 - ... - 2016-01-19 20:59 - 2016-01-19 21:00 - 2016-01-20 14:31 - 2016-01-20 14:32 - ... - 2016-01-20 20:59 - 2016-01-20 21:00 - - All assets are written with a common 'index', sharing a common first - trading day. Assets that do not begin trading until after the first trading - day will have zeros for all pricing data up and until data is traded. - - 'index' is in quotations, because bcolz does not provide an index. The - format allows index-like behavior by writing each minute's data into the - corresponding position of the enumeration of the aforementioned datetime - index. - - The datetimes which correspond to each position are written in the metadata - as integer nanoseconds since the epoch into the `minute_index` key. - - See Also - -------- - catalyst.data.minute_bars.BcolzMinuteBarReader - """ - COL_NAMES = ('open', 'high', 'low', 'close', 'volume') - - def __init__(self, - rootdir, - calendar, - start_session, - end_session, - five_minutes_per_day, - default_ohlc_ratio=OHLC_RATIO, - ohlc_ratios_per_sid=None, - expectedlen=DEFAULT_EXPECTEDLEN_CRYPTO, - write_metadata=True): - - self._rootdir = rootdir - self._start_session = start_session - self._end_session = end_session - self._calendar = calendar - slicer = ( - calendar.schedule.index.slice_indexer(start_session, end_session)) - self._schedule = calendar.schedule[slicer] - self._session_labels = self._schedule.index - self._five_minutes_per_day = five_minutes_per_day - self._expectedlen = expectedlen - self._default_ohlc_ratio = default_ohlc_ratio - self._ohlc_ratios_per_sid = ohlc_ratios_per_sid - - self._five_minute_index = _calc_five_minute_index( - self._schedule.market_open, self._five_minutes_per_day) - - if write_metadata: - metadata = BcolzFiveMinuteBarMetadata( - self._default_ohlc_ratio, - self._ohlc_ratios_per_sid, - self._calendar, - self._start_session, - self._end_session, - self._five_minutes_per_day, - ) - metadata.write(self._rootdir) - - @classmethod - def open(cls, rootdir, end_session=None): - """ - Open an existing ``rootdir`` for writing. - - Parameters - ---------- - end_session : Timestamp (optional) - When appending, the intended new ``end_session``. - """ - metadata = BcolzFiveMinuteBarMetadata.read(rootdir) - return BcolzFiveMinuteBarWriter( - rootdir, - metadata.calendar, - metadata.start_session, - end_session if end_session is not None else metadata.end_session, - metadata.five_minutes_per_day, - metadata.default_ohlc_ratio, - metadata.ohlc_ratios_per_sid, - write_metadata=end_session is not None - ) - - @property - def first_trading_day(self): - return self._start_session - - def ohlc_ratio_for_sid(self, sid): - if self._ohlc_ratios_per_sid is not None: - try: - return self._ohlc_ratios_per_sid[sid] - except KeyError: - pass - - # If no ohlc_ratios_per_sid dict is passed, or if the specified - # sid is not in the dict, fallback to the general ohlc_ratio. - return self._default_ohlc_ratio - - def sidpath(self, sid): - """ - Parameters - ---------- - sid : int - Asset identifier. - - Returns - ------- - out : string - Full path to the bcolz rootdir for the given sid. - """ - sid_subdir = _sid_subdir_path(sid) - return join(self._rootdir, sid_subdir) - - def last_date_in_output_for_sid(self, sid): - """ - Parameters - ---------- - sid : int - Asset identifier. - - Returns - ------- - out : pd.Timestamp - The midnight of the last date written in to the output for the - given sid. - """ - sizes_path = "{0}/close/meta/sizes".format(self.sidpath(sid)) - if not os.path.exists(sizes_path): - return pd.NaT - with open(sizes_path, mode='r') as f: - sizes = f.read() - data = json.loads(sizes) - # use integer division so that the result is an int - # for pandas index later https://github.com/pandas-dev/pandas/blob/master/pandas/tseries/base.py#L247 # noqa - num_days = data['shape'][0] // self._five_minutes_per_day - if num_days == 0: - # empty container - return pd.NaT - return self._session_labels[num_days - 1] - - def _init_ctable(self, path): - """ - Create empty ctable for given path. - - Parameters - ---------- - path : string - The path to rootdir of the new ctable. - """ - # Only create the containing subdir on creation. - # This is not to be confused with the `.bcolz` directory, but is the - # directory up one level from the `.bcolz` directories. - sid_containing_dirname = os.path.dirname(path) - if not os.path.exists(sid_containing_dirname): - # Other sids may have already created the containing directory. - os.makedirs(sid_containing_dirname) - initial_array = np.empty(0, np.uint64) - table = ctable( - rootdir=path, - columns=[ - initial_array, - initial_array, - initial_array, - initial_array, - initial_array, - ], - names=[ - 'open', - 'high', - 'low', - 'close', - 'volume' - ], - expectedlen=self._expectedlen, - mode='w', - ) - table.flush() - return table - - def _ensure_ctable(self, sid): - """Ensure that a ctable exists for ``sid``, then return it.""" - sidpath = self.sidpath(sid) - if not os.path.exists(sidpath): - return self._init_ctable(sidpath) - return bcolz.ctable(rootdir=sidpath, mode='a') - - def _zerofill(self, table, numdays): - # Compute the number of minutes to be filled, accounting for the - # possibility of a partial day's worth of minutes existing for - # the previous day. - five_minute_offset = len(table) % self._five_minutes_per_day - num_to_prepend = numdays * self._five_minutes_per_day - five_minute_offset - - prepend_array = np.zeros(num_to_prepend, np.uint64) - # Fill all OHLCV with zeros. - table.append([prepend_array] * 5) - table.flush() - - def pad(self, sid, date): - """ - Fill sid container with empty data through the specified date. - - If the last recorded trade is not at the close, then that day will be - padded with zeros until its close. Any day after that (up to and - including the specified date) will be padded with `minute_per_day` - worth of zeros - - Parameters - ---------- - sid : int - The asset identifier for the data being written. - date : datetime-like - The date used to calculate how many slots to be pad. - The padding is done through the date, i.e. after the padding is - done the `last_date_in_output_for_sid` will be equal to `date` - """ - table = self._ensure_ctable(sid) - - last_date = self.last_date_in_output_for_sid(sid) - - tds = self._session_labels - - if date <= last_date or date < tds[0]: - # No need to pad. - return - - if last_date == pd.NaT: - # If there is no data, determine how many days to add so that - # desired days are written to the correct slots. - days_to_zerofill = tds[tds.slice_indexer(end=date)] - else: - days_to_zerofill = tds[tds.slice_indexer( - start=last_date + tds.freq, - end=date)] - - self._zerofill(table, len(days_to_zerofill)) - - new_last_date = self.last_date_in_output_for_sid(sid) - assert new_last_date == date, "new_last_date={0} != date={1}".format( - new_last_date, date) - - def set_sid_attrs(self, sid, **kwargs): - """Write all the supplied kwargs as attributes of the sid's file. - """ - table = self._ensure_ctable(sid) - for k, v in kwargs.items(): - table.attrs[k] = v - - def write(self, - data, - length=None, - show_progress=False, - invalid_data_behavior='warn'): - """Write a stream of minute data. - - Parameters - ---------- - data : iterable[(int, pd.DataFrame)] - The data to write. Each element should be a tuple of sid, data - where data has the following format: - columns : ('open', 'high', 'low', 'close', 'volume') - open : float64 - high : float64 - low : float64 - close : float64 - volume : float64|int64 - index : DatetimeIndex of market minutes. - A given sid may appear more than once in ``data``; however, - the dates must be strictly increasing. - show_progress : bool, optional - Whether or not to show a progress bar while writing. - """ - with maybe_show_progress( - data, - length=length, - show_percent=False, - show_progress=show_progress, - item_show_func=item_show_count(length), - label='Compiling five-minute data', - ) as it: - write_sid = self.write_sid - for e in it: - write_sid(*e, invalid_data_behavior=invalid_data_behavior) - - def write_sid(self, sid, df, invalid_data_behavior='warn'): - """ - Write the OHLCV data for the given sid. - If there is no bcolz ctable yet created for the sid, create it. - If the length of the bcolz ctable is not exactly to the date before - the first day provided, fill the ctable with 0s up to that date. - - Parameters - ---------- - sid : int - The asset identifer for the data being written. - df : pd.DataFrame - DataFrame of market data with the following characteristics. - columns : ('open', 'high', 'low', 'close', 'volume') - open : float64 - high : float64 - low : float64 - close : float64 - volume : float64|int64 - index : DatetimeIndex of market minutes. - """ - cols = { - 'open': df.open.values, - 'high': df.high.values, - 'low': df.low.values, - 'close': df.close.values, - 'volume': df.volume.values, - } - dts = df.index.values - # Call internal method, since DataFrame has already ensured matching - # index and value lengths. - self._write_cols(sid, dts, cols, invalid_data_behavior) - - def write_cols(self, sid, dts, cols, invalid_data_behavior='warn'): - """ - Write the OHLCV data for the given sid. - If there is no bcolz ctable yet created for the sid, create it. - If the length of the bcolz ctable is not exactly to the date before - the first day provided, fill the ctable with 0s up to that date. - - Parameters - ---------- - sid : int - The asset identifier for the data being written. - dts : datetime64 array - The dts corresponding to values in cols. - cols : dict of str -> np.array - dict of market data with the following characteristics. - keys are ('open', 'high', 'low', 'close', 'volume') - open : float64 - high : float64 - low : float64 - close : float64 - volume : float64|int64 - """ - if not all(len(dts) == len(cols[name]) for name in self.COL_NAMES): - raise BcolzFiveMinuteWriterColumnMismatch( - "Length of dts={0} should match cols: {1}".format( - len(dts), - " ".join("{0}={1}".format(name, len(cols[name])) - for name in self.COL_NAMES))) - self._write_cols(sid, dts, cols, invalid_data_behavior) - - def _write_cols(self, sid, dts, cols, invalid_data_behavior): - """ - Internal method for `write_cols` and `write`. - - Parameters - ---------- - sid : int - The asset identifier for the data being written. - dts : datetime64 array - The dts corresponding to values in cols. - cols : dict of str -> np.array - dict of market data with the following characteristics. - keys are ('open', 'high', 'low', 'close', 'volume') - open : float64 - high : float64 - low : float64 - close : float64 - volume : float64|int64 - """ - table = self._ensure_ctable(sid) - - tds = self._session_labels - input_first_day = self._calendar.minute_to_session_label( - pd.Timestamp(dts[0]), direction='previous') - - last_date = self.last_date_in_output_for_sid(sid) - - day_before_input = input_first_day - tds.freq - - self.pad(sid, day_before_input) - table = self._ensure_ctable(sid) - - # Get the number of minutes already recorded in this sid's ctable - num_rec_mins = table.size - - all_minutes = self._five_minute_index - # Get the latest minute we wish to write to the ctable - last_minute_to_write = pd.Timestamp(dts[-1], tz='UTC') - - #print 'all_minutes[-1]:', all_minutes[num_rec_mins-1] - #print 'last_minute_to_write:', last_minute_to_write - - # In the event that we've already written some minutely data to the - # ctable, guard against overwriting that data. - if num_rec_mins > 0: - last_recorded_minute = all_minutes[num_rec_mins - 1] - if last_minute_to_write <= last_recorded_minute: - raise BcolzMinuteOverlappingData(dedent(""" - Data with last_date={0} already includes input start={1} for - sid={2}""".strip()).format(last_date, input_first_day, sid)) - - latest_min_count = all_minutes.get_loc(last_minute_to_write) - - # Get all the minutes we wish to write (all market minutes after the - # latest currently written, up to and including last_minute_to_write) - all_minutes_in_window = all_minutes[num_rec_mins:latest_min_count + 1] - - minutes_count = all_minutes_in_window.size - - open_col = np.zeros(minutes_count, dtype=uint64) - high_col = np.zeros(minutes_count, dtype=uint64) - low_col = np.zeros(minutes_count, dtype=uint64) - close_col = np.zeros(minutes_count, dtype=uint64) - vol_col = np.zeros(minutes_count, dtype=uint64) - - dt_ixs = np.searchsorted(all_minutes_in_window.values, - dts.astype('datetime64[ns]')) - - ohlc_ratio = self.ohlc_ratio_for_sid(sid) - - ( - open_col[dt_ixs], - high_col[dt_ixs], - low_col[dt_ixs], - close_col[dt_ixs], - vol_col[dt_ixs], - ) = convert_cols(cols, ohlc_ratio, sid, invalid_data_behavior) - - table.append([ - open_col, - high_col, - low_col, - close_col, - vol_col - ]) - table.flush() - - def data_len_for_day(self, day): - """ - Return the number of data points up to and including the - provided day. - """ - day_ix = self._session_labels.get_loc(day) - # Add one to the 0-indexed day_ix to get the number of days. - num_days = day_ix + 1 - return num_days * self._five_minutes_per_day - - def truncate(self, date): - """Truncate data beyond this date in all ctables.""" - truncate_slice_end = self.data_len_for_day(date) - - glob_path = os.path.join(self._rootdir, "*", "*", "*.bcolz") - sid_paths = sorted(glob(glob_path)) - - for sid_path in sid_paths: - file_name = os.path.basename(sid_path) - - try: - table = bcolz.open(rootdir=sid_path) - except IOError: - continue - if table.len <= truncate_slice_end: - logger.info("{0} not past truncate date={1}.", file_name, date) - continue - - logger.info( - "Truncating {0} at end_date={1}", file_name, date.date() - ) - - table.resize(truncate_slice_end) - - # Update end session in metadata. - metadata = BcolzFiveMinuteBarMetadata.read(self._rootdir) - metadata.end_session = date - metadata.write(self._rootdir) - - -class BcolzFiveMinuteBarReader(FiveMinuteBarReader): - """ - Reader for data written by BcolzFiveMinuteBarWriter - - Parameters - ---------- - rootdir : string - The root directory containing the metadata and asset bcolz - directories. - - See Also - -------- - catalyst.data.minute_bars.BcolzFiveMinuteBarWriter - """ - FIELDS = ('open', 'high', 'low', 'close', 'volume') - - def __init__(self, rootdir, sid_cache_size=1000): - self._rootdir = rootdir - - metadata = self._get_metadata() - - self._start_session = metadata.start_session - self._end_session = metadata.end_session - - self.calendar = metadata.calendar - slicer = self.calendar.schedule.index.slice_indexer( - self._start_session, - self._end_session, - ) - self._schedule = self.calendar.schedule[slicer] - self._market_opens = self._schedule.market_open - self._market_open_values = self._market_opens.values.\ - astype('datetime64[m]').astype(np.int64) - self._market_closes = self._schedule.market_close - self._market_close_values = self._market_closes.values.\ - astype('datetime64[m]').astype(np.int64) - - self._default_ohlc_inverse = 1.0 / metadata.default_ohlc_ratio - ohlc_ratios = metadata.ohlc_ratios_per_sid - if ohlc_ratios: - self._ohlc_inverses_per_sid = ( - valmap(lambda x: 1.0 / x, ohlc_ratios)) - else: - self._ohlc_inverses_per_sid = None - - self._five_minutes_per_day = metadata.five_minutes_per_day - - self._carrays = { - field: LRU(sid_cache_size) - for field in self.FIELDS - } - - self._last_get_value_dt_position = None - self._last_get_value_dt_value = None - - # This is to avoid any bad data or other performance-killing situation - # where there a consecutive streak of 0 (no volume) starting at an - # asset's start date. - # if asset 1 started on 2015-01-03 but its first trade is 2015-01-06 - # 10:31 AM US/Eastern, this dict would store {1: 23675971}, - # which is the minute epoch of that date. - self._known_zero_volume_dict = {} - - def _get_metadata(self): - return BcolzFiveMinuteBarMetadata.read(self._rootdir) - - @property - def trading_calendar(self): - return self.calendar - - @lazyval - def last_available_dt(self): - _, close = self.calendar.open_and_close_for_session(self._end_session) - return close - - @property - def first_trading_day(self): - return self._start_session - - def _ohlc_ratio_inverse_for_sid(self, sid): - if self._ohlc_inverses_per_sid is not None: - try: - return self._ohlc_inverses_per_sid[sid] - except KeyError: - pass - - # If we can not get a sid-specific OHLC inverse for this sid, - # fallback to the default. - return self._default_ohlc_inverse - - def _minutes_to_exclude(self): - """ - Calculate the minutes which should be excluded when a window - occurs on days which had an early close, i.e. days where the close - based on the regular period of minutes per day and the market close - do not match. - - Returns - ------- - List of DatetimeIndex representing the minutes to exclude because - of early closes. - """ - market_opens = self._market_opens.values.astype('datetime64[m]') - market_closes = self._market_closes.values.astype('datetime64[m]') - minutes_per_day = (market_closes - market_opens).astype(np.int64) / 5 - early_indices = np.where( - minutes_per_day != self._five_minutes_per_day - 1)[0] - early_opens = self._market_opens[early_indices] - early_closes = self._market_closes[early_indices] - minutes = [(market_open, early_close) - for market_open, early_close - in zip(early_opens, early_closes)] - return minutes - - @lazyval - def _minute_exclusion_tree(self): - """ - Build an interval tree keyed by the start and end of each range - of positions should be dropped from windows. (These are the minutes - between an early close and the minute which would be the close based - on the regular period if there were no early close.) - The value of each node is the same start and end position stored as - a tuple. - - The data is stored as such in support of a fast answer to the question, - does a given start and end position overlap any of the exclusion spans? - - Returns - ------- - IntervalTree containing nodes which represent the minutes to exclude - because of early closes. - """ - itree = IntervalTree() - for market_open, early_close in self._minutes_to_exclude(): - start_pos = self._find_position_of_five_minute(early_close) + 1 - end_pos = ( - self._find_position_of_five_minute(market_open) - + - self._five_minutes_per_day - - - 1 - ) - data = (start_pos, end_pos) - itree[start_pos:end_pos + 1] = data - return itree - - def _exclusion_indices_for_range(self, start_idx, end_idx): - """ - Returns - ------- - List of tuples of (start, stop) which represent the ranges of minutes - which should be excluded when a market minute window is requested. - """ - itree = self._minute_exclusion_tree - if itree.overlaps(start_idx, end_idx): - ranges = [] - intervals = itree[start_idx:end_idx] - for interval in intervals: - ranges.append(interval.data) - return sorted(ranges) - else: - return None - - def _get_carray_path(self, sid, field): - sid_subdir = _sid_subdir_path(sid) - # carrays are subdirectories of the sid's rootdir - return os.path.join(self._rootdir, sid_subdir, field) - - def _open_minute_file(self, field, sid): - sid = int(sid) - - try: - carray = self._carrays[field][sid] - except KeyError: - carray = self._carrays[field][sid] = \ - bcolz.carray(rootdir=self._get_carray_path(sid, field), - mode='r') - - return carray - - def table_len(self, sid): - """Returns the length of the underlying table for this sid.""" - return len(self._open_minute_file('close', sid)) - - def get_sid_attr(self, sid, name): - sid_subdir = _sid_subdir_path(sid) - sid_path = os.path.join(self._rootdir, sid_subdir) - attrs = bcolz.attrs.attrs(sid_path, 'r') - try: - return attrs[name] - except KeyError: - return None - - def get_value(self, sid, dt, field): - """ - Retrieve the pricing info for the given sid, dt, and field. - - Parameters - ---------- - sid : int - Asset identifier. - dt : datetime-like - The datetime at which the trade occurred. - field : string - The type of pricing data to retrieve. - ('open', 'high', 'low', 'close', 'volume') - - Returns - ------- - out : float|int - - The market data for the given sid, dt, and field coordinates. - - For OHLC: - Returns a float if a trade occurred at the given dt. - If no trade occurred, a np.nan is returned. - - For volume: - Returns the integer value of the volume. - (A volume of 0 signifies no trades for the given dt.) - """ - if self._last_get_value_dt_value == dt.value: - minute_pos = self._last_get_value_dt_position - else: - try: - minute_pos = self._find_position_of_five_minute(dt) - except ValueError: - raise NoDataOnDate() - - self._last_get_value_dt_value = dt.value - self._last_get_value_dt_position = minute_pos - - try: - value = self._open_minute_file(field, sid)[minute_pos] - except IndexError: - value = 0 - if value == 0: - if field == 'volume': - return 0 - else: - return np.nan - - if field != 'volume': - value *= self._ohlc_ratio_inverse_for_sid(sid) - return value - - def get_last_traded_dt(self, asset, dt): - minute_pos = self._find_last_traded_five_minute_position(asset, dt) - if minute_pos == -1: - return pd.NaT - return self._pos_to_minute(minute_pos) - - def _find_last_traded_five_minute_position(self, asset, dt): - volumes = self._open_minute_file('volume', asset) - start_date_minute = asset.start_date.value / NANOS_IN_FIVE_MINUTE - dt_minute = dt.value / NANOS_IN_FIVE_MINUTE - - try: - # if we know of a dt before which this asset has no volume, - # don't look before that dt - earliest_dt_to_search = self._known_zero_volume_dict[asset.sid] - except KeyError: - earliest_dt_to_search = start_date_minute - - if dt_minute < earliest_dt_to_search: - return -1 - - pos = find_last_traded_five_minute_position_internal( - self._market_open_values, - self._market_close_values, - dt_minute, - earliest_dt_to_search, - volumes, - self._five_minutes_per_day, - ) - - if pos == -1: - # if we didn't find any volume before this dt, save it to avoid - # work in the future. - try: - self._known_zero_volume_dict[asset.sid] = max( - dt_minute, - self._known_zero_volume_dict[asset.sid] - ) - except KeyError: - self._known_zero_volume_dict[asset.sid] = dt_minute - - return pos - - def _pos_to_minute(self, pos): - minute_epoch = five_minute_value( - self._market_open_values, - pos, - self._five_minutes_per_day - ) - - return pd.Timestamp(minute_epoch, tz='UTC', unit="m") - - def _find_position_of_five_minute(self, minute_dt): - """ - Internal method that returns the position of the given minute in the - list of every trading minute since market open of the first trading - day. Adjusts non market minutes to the last close. - - ex. this method would return 1 for 2002-01-02 9:32 AM Eastern, if - 2002-01-02 is the first trading day of the dataset. - - Parameters - ---------- - minute_dt: pd.Timestamp - The minute whose position should be calculated. - - Returns - ------- - int: The position of the given minute in the list of all trading - minutes since market open on the first trading day. - """ - return find_position_of_five_minute( - self._market_open_values, - self._market_close_values, - minute_dt.value / NANOS_IN_FIVE_MINUTE, - self._five_minutes_per_day, - False, - ) - - def load_raw_arrays(self, fields, start_dt, end_dt, sids): - """ - Parameters - ---------- - fields : list of str - 'open', 'high', 'low', 'close', or 'volume' - start_dt: Timestamp - Beginning of the window range. - end_dt: Timestamp - End of the window range. - sids : list of int - The asset identifiers in the window. - - Returns - ------- - list of np.ndarray - A list with an entry per field of ndarrays with shape - (minutes in range, sids) with a dtype of float64, containing the - values for the respective field over start and end dt range. - """ - start_idx = self._find_position_of_five_minute(start_dt) - end_idx = self._find_position_of_five_minute(end_dt) - - num_minutes = (end_idx - start_idx + 1) - - results = [] - - indices_to_exclude = self._exclusion_indices_for_range( - start_idx, end_idx) - if indices_to_exclude is not None: - for excl_start, excl_stop in indices_to_exclude: - length = excl_stop - excl_start + 1 - num_minutes -= length - - shape = num_minutes, len(sids) - - for field in fields: - if field != 'volume': - out = np.full(shape, np.nan) - else: - out = np.zeros(shape, dtype=uint64) - - for i, sid in enumerate(sids): - carray = self._open_minute_file(field, sid) - values = carray[start_idx:end_idx + 1] - if indices_to_exclude is not None: - for excl_start, excl_stop in indices_to_exclude[::-1]: - excl_slice = np.s_[ - excl_start - start_idx:excl_stop - start_idx + 1] - values = np.delete(values, excl_slice) - - where = values != 0 - # first slice down to len(where) because we might not have - # written data for all the minutes requested - if field != 'volume': - out[:len(where), i][where] = ( - values[where] * self._ohlc_ratio_inverse_for_sid(sid)) - else: - out[:len(where), i][where] = values[where] - - results.append(out) - return results - - -class MinuteBarUpdateReader(with_metaclass(ABCMeta, object)): - """ - Abstract base class for minute update readers. - """ - - @abstractmethod - def read(self, dts, sids): - """ - Read and return pricing update data. - - Parameters - ---------- - dts : DatetimeIndex - The minutes for which to read the pricing updates. - sids : iter[int] - The sids for which to read the pricing updates. - - Returns - ------- - data : iter[(int, DataFrame)] - Returns an iterable of ``sid`` to the corresponding OHLCV data. - """ - raise NotImplementedError() - - -class H5MinuteBarUpdateWriter(object): - """ - Writer for files containing minute bar updates for consumption by a writer - for a ``MinuteBarReader`` format. - - Parameters - ---------- - path : str - The destination path. - complevel : int, optional - The HDF5 complevel, defaults to ``5``. - complib : str, optional - The HDF5 complib, defaults to ``zlib``. - """ - - FORMAT_VERSION = 0 - - _COMPLEVEL = 5 - _COMPLIB = 'zlib' - - def __init__(self, path, complevel=None, complib=None): - self._complevel = complevel if complevel \ - is not None else self._COMPLEVEL - self._complib = complib if complib \ - is not None else self._COMPLIB - self._path = path - - def write(self, frames): - """ - Write the frames to the target HDF5 file, using the format used by - ``pd.Panel.to_hdf`` - - Parameters - ---------- - frames : iter[(int, DataFrame)] or dict[int -> DataFrame] - An iterable or other mapping of sid to the corresponding OHLCV - pricing data. - """ - with HDFStore(self._path, 'w', - complevel=self._complevel, complib=self._complib) \ - as store: - panel = pd.Panel.from_dict(dict(frames)) - panel.to_hdf(store, 'updates') - with tables.open_file(self._path, mode='r+') as h5file: - h5file.set_node_attr('/', 'version', 0) - - -class H5MinuteBarUpdateReader(MinuteBarUpdateReader): - """ - Reader for minute bar updates stored in HDF5 files. - - Parameters - ---------- - path : str - The path of the HDF5 file from which to source data. - """ - def __init__(self, path): - self._panel = pd.read_hdf(path) - - def read(self, dts, sids): - panel = self._panel[sids, dts, :] - return panel.iteritems() diff --git a/catalyst/exchange/data_portal_exchange.py b/catalyst/exchange/data_portal_exchange.py index 88808fa6..d2191920 100644 --- a/catalyst/exchange/data_portal_exchange.py +++ b/catalyst/exchange/data_portal_exchange.py @@ -20,9 +20,8 @@ from catalyst.assets._assets import TradingPair from logbook import Logger from catalyst.data.bundles.core import from_bundle_ingest_dirname, \ - minute_path, five_minute_path, daily_path + minute_path, daily_path from catalyst.data.data_portal import DataPortal -from catalyst.data.five_minute_bars import BcolzFiveMinuteBarReader from catalyst.data.minute_bars import BcolzMinuteBarReader from catalyst.data.us_equity_pricing import BcolzDailyBarReader from catalyst.exchange.exchange_errors import ( @@ -262,7 +261,6 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): self.daily_bar_readers = dict() self.minute_bar_readers = dict() - self.five_minute_bar_readers = dict() self.history_loaders = dict() self.minute_history_loaders = dict() @@ -333,7 +331,7 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): Pick from a collection of readers based of exchange name and frequency. :param data_frequency: - The reader frequency: minute, 5-minute, daily. + The reader frequency: minute, daily. :param exchange_name: The exchange name. diff --git a/catalyst/exchange/exchange.py b/catalyst/exchange/exchange.py index ade14dfe..0c4d7163 100644 --- a/catalyst/exchange/exchange.py +++ b/catalyst/exchange/exchange.py @@ -672,7 +672,7 @@ class Exchange: Retrieve OHLCV candles for the given assets :param data_frequency: - The candle frequency: minute, 5-minute or daily + The candle frequency: minute or daily :param assets: list[TradingPair] The targeted assets. :param bar_count: diff --git a/catalyst/exchange/exchange_bundle.py b/catalyst/exchange/exchange_bundle.py index df640311..9c3cccc1 100644 --- a/catalyst/exchange/exchange_bundle.py +++ b/catalyst/exchange/exchange_bundle.py @@ -5,7 +5,6 @@ import numpy as np from logbook import Logger, INFO from catalyst import get_calendar -from catalyst.data.five_minute_bars import BcolzFiveMinuteOverlappingData from catalyst.data.minute_bars import BcolzMinuteOverlappingData from catalyst.exchange.bitfinex.bitfinex import Bitfinex from catalyst.exchange.bittrex.bittrex import Bittrex @@ -161,8 +160,7 @@ def process_bar_data(exchange, assets, writer, data_frequency, show_progress=False, invalid_data_behavior='raise' ) - except (BcolzMinuteOverlappingData, - BcolzFiveMinuteOverlappingData) as e: + except BcolzMinuteOverlappingData as e: log.warn('chunk already exists {}: {}'.format(chunk, e)) @@ -215,7 +213,6 @@ def exchange_bundle(exchange_name, symbols=None, start=None, end=None, def ingest(environ, asset_db_writer, minute_bar_writer, - five_minute_bar_writer, daily_bar_writer, adjustment_writer, calendar, @@ -292,8 +289,6 @@ def exchange_bundle(exchange_name, symbols=None, start=None, end=None, # end=end # ) - # TODO: delete 5-minute writer everywhere - if minute_bar_writer is not None: process_bar_data( exchange=exchange, diff --git a/catalyst/exchange/poloniex/poloniex.py b/catalyst/exchange/poloniex/poloniex.py index 2a0b8474..4fc541de 100644 --- a/catalyst/exchange/poloniex/poloniex.py +++ b/catalyst/exchange/poloniex/poloniex.py @@ -29,6 +29,7 @@ from catalyst.exchange.exchange_execution import ExchangeLimitOrder, \ from catalyst.finance.order import Order, ORDER_STATUS from catalyst.protocol import Account from catalyst.exchange.exchange_utils import get_exchange_symbols_filename +from catalyst.finance.transaction import Transaction log = Logger('Poloniex') @@ -274,14 +275,12 @@ class Poloniex(Exchange): if(is_buy): response = self.api.buy(exchange_symbol, amount, price) else: - reponse = self.api.sell(exchange_symbol, amount, price) + response = self.api.sell(exchange_symbol, -amount, price) except Exception as e: raise ExchangeRequestError(error=e) date = pd.Timestamp.utcnow() - print(response) - if('orderNumber' in response): order_id = str(response['orderNumber']) order = Order( @@ -372,14 +371,19 @@ class Poloniex(Exchange): except Exception as e: raise OrphanOrderError(order_id=order_id, exchange=self.name) + return order + + # TODO: Need to decide whether we fetch orders locally or from exchnage + # The code below is ignored + try: response = self.api.returnopenorders(self.get_symbol(order.sid)) except Exception as e: raise ExchangeRequestError(error=e) - for order in response: - if(int(order['orderNumber'])==int(order_id)): - return True + for o in response: + if(int(o['orderNumber'])==int(order_id)): + return order return None @@ -392,23 +396,31 @@ class Poloniex(Exchange): order_param : str or Order The order_id or order object to cancel. """ - order_id = order_param.id \ - if isinstance(order_param, Order) else order_param + + if(isinstance(order_param, Order)): + order = order_param + else: + order = self._portfolio.open_orders[order_param] try: - response = self.api.cancelorder(order_id) + response = self.api.cancelorder(order.id) except Exception as e: raise ExchangeRequestError(error=e) if 'error' in response: - raise OrderCancelError( - order_id=order_id, + log.info('Unable to cancel order {order_id} on exchange {exchange} {error}.'.format( + order_id=order.id, exchange=self.name, error=response['error'] - ) - - self.portfolio.remove_order(order_param) #TODO: Verify this works + )) + #raise OrderCancelError( + # order_id=order.id, + # exchange=self.name, + # error=response['error'] + #) + + self.portfolio.remove_order(order) def tickers(self, assets): @@ -508,7 +520,7 @@ class Poloniex(Exchange): except Exception as e: raise ExchangeRequestError(error=e) - if(response['error']): + if('error' in response): if(not order_open): raise OrphanOrderReverseError(order_id=order_id, exchange=self.name) else: @@ -524,16 +536,17 @@ class Poloniex(Exchange): """ if(not filter(lambda item: item['order_id'] == tx['tradeID'], self.transactions[order_id])): log.debug('Got new transaction for order {}: amount {}, price {}'.format( - order_id, tx.amount, tx.rate)) + order_id, tx['amount'], tx['rate'])) + tx['amount']=float(tx['amount']) if(tx['type']=='sell'): tx['amount'] = -tx['amount'] transaction = Transaction( asset=order.asset, amount=tx['amount'], dt=pd.to_datetime(tx['date'], utc=True), - price=tx['rate'], + price=float(tx['rate']), order_id=tx['tradeID'], # it's a misnomer, but keeping it for compatibility - commission=tx['fee'] + commission=float(tx['fee']) ) self.transactions[order_id].append(transaction) self.portfolio.execute_transaction(transaction) diff --git a/catalyst/exchange/poloniex/poloniex_api.py b/catalyst/exchange/poloniex/poloniex_api.py index 3a181c07..4fe04972 100644 --- a/catalyst/exchange/poloniex/poloniex_api.py +++ b/catalyst/exchange/poloniex/poloniex_api.py @@ -15,6 +15,10 @@ class Poloniex_api(object): def __init__(self, key, secret): self.key = key self.secret = secret + + self.max_requests_per_second = 6 + self.request_cpt = dict() + self.public = ['returnTicker', 'return24Volume', 'returnOrderBook', 'returnTradeHistory', 'returnChartData', 'returnCurrencies', 'returnLoanOrders'] @@ -29,6 +33,43 @@ class Poloniex_api(object): 'cancelLoanOffer','returnOpenLoanOffers','returnActiveLoans', 'returnLendingHistory','toggleAutoRenew'] + def ask_request(self): + """ + Asks permission to issue a request to the exchange. + The primary purpose is to avoid hitting rate limits. + + The application will pause if the maximum requests per minute + permitted by the exchange is exceeded. + + :return boolean: + + """ + now = time.time() + if not self.request_cpt: + self.request_cpt = dict() + self.request_cpt[now] = 0 + return True + + cpt_date = self.request_cpt.keys()[0] + cpt = self.request_cpt[cpt_date] + + if now > cpt_date + 1: + self.request_cpt = dict() + self.request_cpt[now] = 0 + return True + + if cpt >= self.max_requests_per_second: + + log.debug('max requests 6 reached, sleeping for 1 seconds') + sleep(1) + + now = time.time() + self.request_cpt = dict() + self.request_cpt[now] = 0 + return True + else: + self.request_cpt[cpt_date] += 1 + def query(self, method, req={}): if method in self.public: @@ -45,6 +86,7 @@ class Poloniex_api(object): else: raise ValueError('Method "' + method + '" not found in neither the Public API or Trading API endpoints') + self.ask_request() req = urllib.request.Request(url, data=post_data, headers=headers) return json.loads(urlopen(req).read()) diff --git a/catalyst/finance/performance/tracker.py b/catalyst/finance/performance/tracker.py index 810f2c32..c6df9292 100644 --- a/catalyst/finance/performance/tracker.py +++ b/catalyst/finance/performance/tracker.py @@ -111,27 +111,11 @@ class PerformanceTracker(object): self.treasury_curves, self.trading_calendar ) - elif self.emission_rate == '5-minute': - self.all_benchmark_returns = pd.Series( - index=pd.date_range( - self.sim_params.first_open, - self.sim_params.last_close, - freq='5min' - ), - ) - self.cumulative_risk_metrics = \ - risk.RiskMetricsCumulative( - self.sim_params, - self.treasury_curves, - self.trading_calendar, - create_first_day_stats=True, - ) elif self.emission_rate == 'minute': self.all_benchmark_returns = pd.Series(index=pd.date_range( self.sim_params.first_open, self.sim_params.last_close, freq='Min') ) - self.cumulative_risk_metrics = \ risk.RiskMetricsCumulative( self.sim_params, diff --git a/catalyst/gens/sim_engine.pyx b/catalyst/gens/sim_engine.pyx index a318f292..aa3a9d51 100644 --- a/catalyst/gens/sim_engine.pyx +++ b/catalyst/gens/sim_engine.pyx @@ -20,9 +20,7 @@ cimport cython from cpython cimport bool cdef np.int64_t _nanos_in_minute = 60000000000 -cdef np.int64_t _nanos_in_five_minutes = 5 * _nanos_in_minute NANOS_IN_MINUTE = _nanos_in_minute -NANOS_IN_FIVE_MINUTES = _nanos_in_five_minutes cpdef enum: BAR = 0 @@ -117,24 +115,3 @@ cdef class MinuteSimulationClock: yield minute, BAR if minute_emission: yield minute, MINUTE_END - -cdef class FiveMinuteSimulationClock(MinuteSimulationClock): - @cython.boundscheck(False) - @cython.wraparound(False) - cdef dict calc_minutes_by_session(self): - cdef dict five_minutes_by_session - cdef int session_idx - cdef np.int64_t session_nano - cdef np.ndarray[np.int64_t, ndim=1] five_minutes_nanos - - five_minutes_by_session = {} - for session_idx, session_nano in enumerate(self.sessions_nanos): - five_minutes_nanos = np.arange( - self.market_opens_nanos[session_idx], - self.market_closes_nanos[session_idx], - _nanos_in_five_minutes - ) - five_minutes_by_session[session_nano] = pd.to_datetime( - five_minutes_nanos, utc=True, box=True - ) - return five_minutes_by_session diff --git a/catalyst/gens/tradesimulation.py b/catalyst/gens/tradesimulation.py index 1ef9dfbf..f7daa233 100644 --- a/catalyst/gens/tradesimulation.py +++ b/catalyst/gens/tradesimulation.py @@ -34,7 +34,6 @@ class AlgorithmSimulator(object): EMISSION_TO_PERF_KEY_MAP = { 'minute': 'minute_perf', - '5-minute': '5_minute_perf', 'daily': 'daily_perf' } @@ -202,7 +201,7 @@ class AlgorithmSimulator(object): stack.enter_context(self.processor) stack.enter_context(ZiplineAPI(self.algo)) - if algo.data_frequency in set(('minute', '5-minute')): + if algo.data_frequency == 'minute': def execute_order_cancellation_policy(): algo.blotter.execute_cancel_policy(SESSION_END) diff --git a/catalyst/pipeline/loaders/crypto_pricing_loader.py b/catalyst/pipeline/loaders/crypto_pricing_loader.py index 22c0ccf2..da98c988 100644 --- a/catalyst/pipeline/loaders/crypto_pricing_loader.py +++ b/catalyst/pipeline/loaders/crypto_pricing_loader.py @@ -41,10 +41,6 @@ class CryptoPricingLoader(PipelineLoader): reader = bundle.daily_bar_reader all_sessions = cal.all_sessions - elif data_frequency == '5-minute': - reader = bundle.five_minute_bar_reader - all_sessions = cal.all_five_minutes - elif data_frequency == 'minute': reader = bundle.minute_bar_reader all_sessions = cal.all_minutes diff --git a/catalyst/pipeline/loaders/equity_pricing_loader.py b/catalyst/pipeline/loaders/equity_pricing_loader.py index 4d9ec055..c7dbfd41 100644 --- a/catalyst/pipeline/loaders/equity_pricing_loader.py +++ b/catalyst/pipeline/loaders/equity_pricing_loader.py @@ -40,8 +40,6 @@ class USEquityPricingLoader(PipelineLoader): if data_frequency == 'daily': reader = bundle.daily_bar_reader - elif data_frequency == '5-minute': - reader = bundle.five_minute_bar_reader elif daily_bar_reader == 'minute': reader = bundle.minute_bar_reader else: @@ -53,9 +51,6 @@ class USEquityPricingLoader(PipelineLoader): if data_frequency == 'daily': all_sessions = cal.all_sessions - elif data_frequency == '5-minute': - reader = bundle.five_minute_bar_reader - all_sessions = cal.all_five_minutes elif daily_bar_reader == 'minute': reader = bundle.minute_bar_reader all_sessions = cal.all_minutes diff --git a/catalyst/sources/benchmark_source.py b/catalyst/sources/benchmark_source.py index 846b7eb5..05d5c601 100644 --- a/catalyst/sources/benchmark_source.py +++ b/catalyst/sources/benchmark_source.py @@ -65,19 +65,6 @@ class BenchmarkSource(object): ) self._precalculated_series = minute_series - elif self.emission_rate == '5-minute': - five_minutes = \ - trading_calendar.five_minutes_for_sessions_in_range( - sessions[0], - sessions[-1], - ) - - five_minute_series = daily_series.reindex( - index=five_minutes, - method='ffill', - ) - - self._precalculated_series = five_minute_series else: self._precalculated_series = daily_series else: @@ -168,21 +155,6 @@ class BenchmarkSource(object): ffill=True )[asset] - return benchmark_series.pct_change()[1:] - elif self.emission_rate == '5-minute': - five_minutes = trading_calendar.five_minutes_for_sessions_in_range( - self.sessions[0], self.sessions[-1] - ) - benchmark_series = data_portal.get_history_window( - [asset], - five_minutes[-1], - bar_count=len(five_minutes) + 1, - frequency='5m', - field='price', - data_frequency=self.emission_rate, - ffill=True, - )[asset] - return benchmark_series.pct_change()[1:] else: start_date = asset.start_date diff --git a/catalyst/utils/calendars/trading_calendar.py b/catalyst/utils/calendars/trading_calendar.py index a1489067..4748b1c2 100644 --- a/catalyst/utils/calendars/trading_calendar.py +++ b/catalyst/utils/calendars/trading_calendar.py @@ -117,9 +117,6 @@ class TradingCalendar(with_metaclass(ABCMeta)): self._trading_minutes_nanos = self.all_minutes.values.\ astype(np.int64) - - self._trading_five_minutes_nanos = self.all_five_minutes.values.\ - astype(np.int64) self.first_trading_session = _all_days[0] self.last_trading_session = _all_days[-1] @@ -182,18 +179,6 @@ class TradingCalendar(with_metaclass(ABCMeta)): """ return int(self._minutes_per_session[start_session:end_session].sum()) - @lazyval - def _five_minutes_per_session(self): - diff = self.schedule.market_close - self.schedule.market_open - diff = diff.astype('timedelta64[m]') - return (diff + 1) // 5 - - def five_minutes_count_for_sessions_in_range(self, - start_session, - end_session): - five_mins = self._five_minutes_per_session[start_session:end_session] - return int(five_mins.sum()) - @property def regular_holidays(self): """ @@ -386,10 +371,6 @@ class TradingCalendar(with_metaclass(ABCMeta)): idx = next_divider_idx(self._trading_minutes_nanos, dt.value) return self.all_minutes[idx] - def next_five_minute(self, dt): - idx = next_divider_idx(self._trading_five_minutes_nanos, dt.values) - return self.all_five_mintutes[idx] - def previous_minute(self, dt): """ Given a dt, return the previous exchange minute. @@ -484,12 +465,6 @@ class TradingCalendar(with_metaclass(ABCMeta)): end_minute=self.schedule.at[session_label, 'market_close'], ) - def five_minutes_for_session(self, session_label): - return self.five_minutes_in_range( - start_five_minute=self.schedule.at[session_label, 'market_open'], - end_five_minute=self.schedule.at[session_label, 'market_close'], - ) - def minutes_window(self, start_dt, count): start_dt_nanos = start_dt.value all_minutes_nanos = self._trading_minutes_nanos @@ -591,20 +566,6 @@ class TradingCalendar(with_metaclass(ABCMeta)): return abs(end_idx - start_idx) - def five_minutes_in_range(self, start_five_minute, end_five_minute): - start_idx = searchsorted(self._trading_five_minutes_nanos, - start_five_minute.value) - - end_idx = searchsorted(self._trading_five_minutes_nanos, - end_five_minute.value) - - if end_five_minute.value == self._trading_five_minutes_nanos[end_idx]: - # if the end minute is a market minute, increase by 1 - end_idx += 1 - - return self.all_five_minutes[start_idx:end_idx] - - def minutes_in_range(self, start_minute, end_minute): """ Given start and end minutes, return all the calendar minutes @@ -662,15 +623,6 @@ class TradingCalendar(with_metaclass(ABCMeta)): return self.minutes_in_range(first_minute, last_minute) - def five_minutes_for_sessions_in_range(self, - start_session_label, - end_session_label): - - first_minute, _ = self.open_and_close_for_session(start_session_label) - _, last_minute = self.open_and_close_for_session(end_session_label) - - return self.five_minutes_in_range(first_minute, last_minute) - def open_and_close_for_session(self, session_label): """ Returns a tuple of timestamps of the open and close of the session @@ -777,13 +729,6 @@ class TradingCalendar(with_metaclass(ABCMeta)): return DatetimeIndex(all_minutes).tz_localize("UTC") - @lazyval - def all_five_minutes(self): - """ - Returns a DatetimeIndex representing all the five minutes in this calendar. - """ - return self._all_minutes_with_interval(5) - @lazyval def all_minutes(self): """ diff --git a/catalyst/utils/events.py b/catalyst/utils/events.py index 3fc83dd4..10bb2f55 100644 --- a/catalyst/utils/events.py +++ b/catalyst/utils/events.py @@ -602,7 +602,6 @@ class date_rules(object): class time_rules(object): market_open = AfterOpen market_close = BeforeClose - every_5_minutes = Always every_minute = Always diff --git a/tests/exchange/test_bundle.py b/tests/exchange/test_bundle.py index 92a1ff30..6856970c 100644 --- a/tests/exchange/test_bundle.py +++ b/tests/exchange/test_bundle.py @@ -53,7 +53,6 @@ class ExchangeBundleTestCase: ingest(environ=os.environ, asset_db_writer=None, minute_bar_writer=minute_bar_writer, - five_minute_bar_writer=None, daily_bar_writer=None, adjustment_writer=None, calendar=open_calendar, diff --git a/tests/exchange/test_clock.py b/tests/exchange/test_clock.py index 94414d27..ff74986b 100644 --- a/tests/exchange/test_clock.py +++ b/tests/exchange/test_clock.py @@ -1,7 +1,7 @@ from unittest import TestCase from logbook import Logger from mock import patch, sentinel -from catalyst.exchange.exchange_clock import ExchangeClock +from catalyst.exchange.simple_clock import SimpleClock from catalyst.utils.calendars.trading_calendar import days_at_time from datetime import time from collections import defaultdict @@ -35,9 +35,9 @@ class ExchangeClockTestCase(TestCase): return self.internal_clock def test_clock(self): - with patch('catalyst.exchange.exchange_clock.pd.to_datetime') as to_dt, \ - patch('catalyst.exchange.exchange_clock.sleep') as sleep: - clock = ExchangeClock(sessions=self.sessions) + with patch('catalyst.exchange.simple_clock.pd.to_datetime') as to_dt, \ + patch('catalyst.exchange.simple_clock.sleep') as sleep: + clock = SimpleClock(sessions=self.sessions) to_dt.side_effect = self.get_clock sleep.side_effect = self.advance_clock start_time = pd.Timestamp.utcnow()