diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md index bc501c44..7d03b8fd 100644 --- a/.github/ISSUE_TEMPLATE.md +++ b/.github/ISSUE_TEMPLATE.md @@ -1,4 +1,4 @@ -Dear Zipline Maintainers, +Dear Catalyst Maintainers, Before I tell you about my issue, let me describe my environment: @@ -7,7 +7,7 @@ Before I tell you about my issue, let me describe my environment: * Operating System: (Windows Version or `$ uname --all`) * Python Version: `$ python --version` * Python Bitness: `$ python -c 'import math, sys;print(int(math.log(sys.maxsize + 1, 2) + 1))'` -* How did you install Zipline: (`pip`, `conda`, or `other (please explain)`) +* How did you install Catalyst: (`pip`, `conda`, or `other (please explain)`) * Python packages: `$ pip freeze` or `$ conda list` Now that you know a little about me, let me tell you about the issue I am diff --git a/README.rst b/README.rst index daff1270..3425bac6 100644 --- a/README.rst +++ b/README.rst @@ -1 +1 @@ -All the documentation for `Catalyst `_ can be found in the `catalyst-docs wiki `_. +All the documentation for `Catalyst `_ can be found in the `catalyst-docs wiki `_. \ No newline at end of file diff --git a/catalyst/__main__.py b/catalyst/__main__.py index c006df68..7dfe91b1 100644 --- a/catalyst/__main__.py +++ b/catalyst/__main__.py @@ -123,7 +123,7 @@ def ipython_only(option): ) @click.option( '--data-frequency', - type=click.Choice({'daily', 'minute'}), + type=click.Choice({'daily', '5-minute', 'minute'}), default='daily', show_default=True, help='The data frequency of the simulation.', @@ -290,6 +290,13 @@ def catalyst_magic(line, cell=None): show_default=True, help='The data bundle to ingest.', ) +@click.option( + '-c', + '--compile-locally', + is_flag=True, + default=False, + help='Download dataset from source and compile bundle locally.', +) @click.option( '--assets-version', type=int, @@ -301,7 +308,7 @@ def catalyst_magic(line, cell=None): default=True, help='Print progress information to the terminal.' ) -def ingest(bundle, assets_version, show_progress): +def ingest(bundle, compile_locally, assets_version, show_progress): """Ingest the data for the given bundle. """ bundles_module.ingest( @@ -310,6 +317,7 @@ def ingest(bundle, assets_version, show_progress): pd.Timestamp.utcnow(), assets_version, show_progress, + compile_locally, ) diff --git a/catalyst/algorithm.py b/catalyst/algorithm.py index a6fdcebf..ed10fedd 100644 --- a/catalyst/algorithm.py +++ b/catalyst/algorithm.py @@ -133,7 +133,10 @@ from catalyst.utils.security_list import SecurityList import catalyst.protocol from catalyst.sources.requests_csv import PandasRequestsCSV -from catalyst.gens.sim_engine import MinuteSimulationClock +from catalyst.gens.sim_engine import ( + MinuteSimulationClock, + FiveMinuteSimulationClock, +) from catalyst.sources.benchmark_source import BenchmarkSource from catalyst.catalyst_warnings import ZiplineDeprecationWarning @@ -170,7 +173,7 @@ class TradingAlgorithm(object): algo_filename : str, optional The filename for the algoscript. This will be used in exception tracebacks. default: ''. - data_frequency : {'daily', 'minute'}, optional + data_frequency : {'daily', '5-minute', 'minute'}, optional The duration of the bars. instant_fill : bool, optional Whether to fill orders immediately or on next bar. default: False @@ -223,7 +226,7 @@ class TradingAlgorithm(object): script : str Algoscript that contains initialize and handle_data function definition. - data_frequency : {'daily', 'minute'} + data_frequency : {'daily', '5-minute', 'minute'} The duration of the bars. capital_base : float How much capital to start with. @@ -305,7 +308,10 @@ class TradingAlgorithm(object): self.asset_finder = self.trading_environment.asset_finder # Initialize Pipeline API data. - self.init_engine(kwargs.pop('get_pipeline_loader', None)) + self.init_engine( + kwargs.pop('get_pipeline_loader', None), + self.sim_params.data_frequency, + ) self._pipelines = {} # Create an always-expired cache so that we compute the first time data # is requested. @@ -419,16 +425,28 @@ class TradingAlgorithm(object): self.restrictions = NoRestrictions() - def init_engine(self, get_loader): + def init_engine(self, get_loader, data_frequency): """ Construct and store a PipelineEngine from loader. If get_loader is None, constructs an ExplodingPipelineEngine """ if get_loader is not None: + if data_frequency == 'daily': + all_dates = self.trading_calendar.all_sessions + elif data_frequency == '5-minute': + all_dates = self.trading_calendar.all_five_minutes + elif data_frequency == 'minute': + all_dates = self.trading_calendar.all_minutes + else: + raise ValueError( + 'Cannot initialize engine with ' + 'data frequency: {}'.format(data_frequency) + ) + self.engine = SimplePipelineEngine( get_loader, - self.trading_calendar.all_sessions, + all_dates, self.asset_finder, ) else: @@ -449,7 +467,7 @@ class TradingAlgorithm(object): self._in_before_trading_start = True with handle_non_market_minutes(data) if \ - self.data_frequency == "minute" else ExitStack(): + self.data_frequency in ('minute', '5-minute') else ExitStack(): self._before_trading_start(self, data) self._in_before_trading_start = False @@ -505,10 +523,11 @@ class TradingAlgorithm(object): market_closes = trading_o_and_c['market_close'] minutely_emission = False - if self.sim_params.data_frequency == 'minute': + if self.sim_params.data_frequency in set(('minute', '5-minute')): market_opens = trading_o_and_c['market_open'] - minutely_emission = self.sim_params.emission_rate == "minute" + minutely_emission = self.sim_params.emission_rate in \ + set(('minute', '5-minute')) else: # in daily mode, we want to have one bar per session, timestamped # as the last minute of the session. @@ -528,10 +547,19 @@ class TradingAlgorithm(object): # FIXME generalize these values before_trading_start_minutes = days_at_time( self.sim_params.sessions, - time(8, 45), - "US/Eastern" + time(0, 0), + 'UTC', ) + if self.sim_params.data_frequency == '5-minute': + return FiveMinuteSimulationClock( + self.sim_params.sessions, + execution_opens, + execution_closes, + before_trading_start_minutes, + minute_emission=minutely_emission, + ) + return MinuteSimulationClock( self.sim_params.sessions, execution_opens, @@ -660,8 +688,11 @@ class TradingAlgorithm(object): # Assume data is daily if timestamp times are # standardized, otherwise assume minute bars. times = data.major_axis.time - if np.all(times == times[0]): + time_count = times.nunique() + if time_count == 1: self.sim_params.data_frequency = 'daily' + elif time_count == 288: + self.sim_params.data_frequency = '5-minute' else: self.sim_params.data_frequency = 'minute' @@ -683,6 +714,8 @@ class TradingAlgorithm(object): if self.sim_params.data_frequency == 'daily': equity_reader_arg = 'equity_daily_reader' + elif self.sim_params.data_frequency == '5-minute': + equity_daily_reader = 'equity_5_minute_reader' elif self.sim_params.data_frequency == 'minute': equity_reader_arg = 'equity_minute_reader' equity_reader = PanelBarReader( @@ -926,9 +959,9 @@ class TradingAlgorithm(object): The arena from the simulation parameters. This will normally be ``'backtest'`` but some systems may use this distinguish live trading from backtesting. - data_frequency : {'daily', 'minute'} + data_frequency : {'daily', '5-minute', 'minute'} data_frequency tells the algorithm if it is running with - daily data or minute data. + daily, minute, or five-minute mode. start : datetime The start date for the simulation. end : datetime @@ -1102,12 +1135,19 @@ class TradingAlgorithm(object): 'date_rule. You should use keyword argument ' 'time_rule= when calling schedule_function without ' 'specifying a date_rule', stacklevel=3) + + freq = self.sim_params.data_frequency date_rule = date_rule or date_rules.every_day() - time_rule = ((time_rule or time_rules.every_minute()) - if self.sim_params.data_frequency == 'minute' else - # If we are in daily mode the time_rule is ignored. - time_rules.every_minute()) + if freq is 'daily': + # ignore time rule in daily mode + time_rule = time_rules.every_minute() + else: + # use provided time rule or default to every minute or 5 minutes + # based on desired data frequency. + time_rule = time_rule or (time_rules.every_5_minutes() + if freq is '5-minute' else + time_rules.every_minute()) # Check the type of the algorithm's schedule before pulling calendar # Note that the ExchangeTradingSchedule is currently the only @@ -1782,7 +1822,7 @@ class TradingAlgorithm(object): @data_frequency.setter def data_frequency(self, value): - assert value in ('daily', 'minute') + assert value in ('daily', '5-minute', 'minute') self.sim_params.data_frequency = value @api_method diff --git a/catalyst/data/_minute_bar_internal.pyx b/catalyst/data/_minute_bar_internal.pyx index 61818ae1..9ebb0841 100644 --- a/catalyst/data/_minute_bar_internal.pyx +++ b/catalyst/data/_minute_bar_internal.pyx @@ -35,6 +35,17 @@ def minute_value(ndarray[long_t, ndim=1] market_opens, return market_opens[q] + r +@cython.cdivision(True) +def five_minute_value(ndarray[long_t, ndim=1] market_opens, + Py_ssize_t pos, + short five_minutes_per_day): + + cdef short q, r + q = cython.cdiv(pos, five_minutes_per_day) + r = cython.cmod(pos, five_minutes_per_day) + + return market_opens[q] + r + def find_position_of_minute(ndarray[long_t, ndim=1] market_opens, ndarray[long_t, ndim=1] market_closes, long_t minute_val, @@ -88,6 +99,26 @@ def find_position_of_minute(ndarray[long_t, ndim=1] market_opens, return (market_open_loc * minutes_per_day) + delta +def find_position_of_five_minute(ndarray[long_t, ndim=1] market_opens, + ndarray[long_t, ndim=1] market_closes, + long_t five_minute_val, + short five_minutes_per_day, + bool forward_fill): + + cdef Py_ssize_t market_open_loc, market_open, delta + + market_open_loc = \ + searchsorted(market_opens, five_minute_val, side='right') - 1 + market_open = market_opens[market_open_loc] + market_close = market_closes[market_open_loc] + + if not forward_fill and ((five_minute_val - market_open) >= five_minutes_per_day): + raise ValueError("Given five minutes is not between an open and a close") + + delta = int_min(five_minute_val - market_open, market_close - market_open) + + return (market_open_loc * five_minutes_per_day) + delta + def find_last_traded_position_internal( ndarray[long_t, ndim=1] market_opens, ndarray[long_t, ndim=1] market_closes, @@ -157,3 +188,51 @@ def find_last_traded_position_internal( # we've gone to the beginning of this asset's range, and still haven't # found a trade event return -1 + +def find_last_traded_five_minute_position_internal( + ndarray[long_t, ndim=1] market_opens, + ndarray[long_t, ndim=1] market_closes, + long_t end_five_minute, + long_t start_five_minute, + volumes, + short five_minutes_per_day): + cdef Py_ssize_t minute_pos, current_minute, q + + five_minute_pos = int_min( + find_position_of_five_minute( + market_opens, + market_closes, + end_five_minute, + five_minutes_per_day, + True, + ), + len(volumes) - 1, + ) + + while five_minute_pos >= 0: + current_five_minute = five_minute_value( + market_opens, five_minute_pos, five_minutes_per_day + ) + + q = cython.cdiv(five_minute_pos, five_minutes_per_day) + if current_five_minute > market_closes[q]: + five_minute_pos = find_position_of_five_minute( + market_opens, + market_closes, + market_closes[q], + five_minutes_per_day, + False, + ) + continue + + if current_five_minute < start_five_minute: + return -1 + + if volumes[five_minute_pos] != 0: + return five_minute_pos + + five_minute_pos -= 1 + + # we've gone to the beginning of this asset's range, and still haven't + # found a trade event + return -1 diff --git a/catalyst/data/bundles/base.py b/catalyst/data/bundles/base.py new file mode 100644 index 00000000..23640abd --- /dev/null +++ b/catalyst/data/bundles/base.py @@ -0,0 +1,524 @@ +# +# Copyright 2017 Enigma MPC, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from itertools import count +import tarfile +from time import time, sleep + +from abc import abstractmethod, abstractproperty +import logbook +import pandas as pd + +from . import core as bundles + +from catalyst.utils.cli import ( + item_show_count, + maybe_show_progress +) +from catalyst.utils.memoize import lazyval + +logbook.StderrHandler().push_application() +log = logbook.Logger(__name__) + +DEFAULT_RETRIES = 5 + +class BaseBundle(object): + def __init__(self, asset_filter=[]): + self._asset_filter = asset_filter + self._reset() + + def _reset(self): + self._splits = [] + self._dividends = [] + + @lazyval + def name(self): + raise NotImplementedError() + + @lazyval + def exchange(self): + raise NotImplementedError() + + @lazyval + def calendar_name(self): + raise NotImplementedError() + + @lazyval + def minutes_per_day(self): + raise NotImplementedError() + + @lazyval + def five_minutes_per_day(self): + raise NotImplementedError() + + @lazyval + def frequencies(self): + raise NotImplementedError() + + @lazyval + def md_column_names(self): + return _dtypes_to_cols(self.md_dtypes) + + @lazyval + def md_dtypes(self): + raise NotImplementedError() + + @lazyval + def column_names(self): + return _dtypes_to_cols(self.dtypes) + + @lazyval + def dtypes(self): + raise NotImplementedError() + + @lazyval + def tar_url(self): + raise NotImplementedError() + + @lazyval + def wait_time(self): + raise NotImplementedError() + + @abstractproperty + def splits(self): + raise NotImplementedError() + + @abstractproperty + def dividends(self): + raise NotImplementedError() + + @abstractmethod + def fetch_raw_metadata_frame(self, api_key, page_number): + raise NotImplementedError() + + def post_process_symbol_metadata(self, metadata, data): + return metadata + + @abstractmethod + def fetch_raw_symbol_frame(self, api_key, symbol, start_date, end_date): + raise NotImplementedError() + + def ingest(self, + environ, + asset_db_writer, + minute_bar_writer, + five_minute_bar_writer, + daily_bar_writer, + adjustment_writer, + calendar, + start_session, + end_session, + cache, + show_progress, + is_compile, + output_dir): + + try: + api_key = environ.get('CATALYST_API_KEY') + retries = environ.get('CATALYST_DOWNLOAD_ATTEMPTS', 5) + + if is_compile: + # User has instructed local compilation and ingestion of bundle. + # Fetch raw metadata for all symbols. + raw_metadata = self._fetch_metadata_frame( + api_key, + cache=cache, + retries=retries, + environ=environ, + show_progress=show_progress, + ) + + # Compile daily symbol data if bundle supports daily mode and + # persist the dataset to disk. + symbol_map = raw_metadata.symbol + if 'daily' in self.frequencies: + daily_bar_writer.write( + self._fetch_symbol_iter( + api_key, + cache, + symbol_map, + calendar, + start_session, + end_session, + 'daily', + retries, + ), + assets=raw_metadata.index, + show_progress=show_progress, + ) + + # Post-process metadata using cached symbol frames, and write to + # disk. This metadata must be written before any attempt to write + # either minute or 5-minute data. + metadata = self._post_process_metadata( + raw_metadata, + cache, + show_progress=show_progress, + ) + asset_db_writer.write(metadata) + + # Compile 5-minute symbol data if bundle supports 5-minute mode and + # persist the dataset to disk. + ''' + if '5-minute' in self.frequencies: + five_minute_bar_writer.write( + self._fetch_symbol_iter( + api_key, + cache, + symbol_map, + calendar, + start_session, + end_session, + '5-minute', + retries, + ), + length=len(symbol_map), + show_progress=show_progress, + ) + ''' + + # Compile minute symbol data if bundle supports minute mode and + # persist the dataset to disk. + if 'minute' in self.frequencies: + minute_bar_writer.write( + self._fetch_symbol_iter( + api_key, + cache, + symbol_map, + calendar, + start_session, + end_session, + 'minute', + retries, + ), + show_progress=show_progress, + ) + + # For legacy purposes, this call is required to ensure the database + # contains an appropriately initialized file structure. We don't + # forsee a usecase for adjustments at this time, but may later + # choose to expose this functionality in the future. + adjustment_writer.write( + splits=( + pd.concat(self.splits, ignore_index=True) + if len(self.splits) > 0 else + None + ), + dividends=( + pd.concat(self.dividends, ignore_index=True) + if len(self.dividends) > 0 else + None + ), + ) + else: + # Otherwise, user has instructed to download and untar bundle + # directly from the bundles `tar_url`. + self._download_and_untar(show_progress, output_dir) + except Exception as e: + log.exception( + ' Failed to ingest {name}:\n{msg}'.format( + name=self.name, + msg=str(e), + ) + ) + else: + self._reset() + + def _download_and_untar(self, show_progress, output_dir): + # Download bundle conditioned on whether the user would like progress + # information to be displayed in the CLI. + if show_progress: + data = bundles.download_with_progress( + self.tar_url, + chunk_size=bundles.ONE_MEGABYTE, + label='Downloading {name} bundle'.format(name=self.name), + ) + else: + data = bundles.download_without_progress(self.tar_url) + + # File transfer has completed, untar the bundle to the appropriate + # data directory. + with tarfile.open('r', fileobj=data) as tar: + tar.extractall(output_dir) + + def _fetch_metadata_frame(self, + api_key, + cache, + retries=DEFAULT_RETRIES, + environ=None, + show_progress=False): + + # Setup raw metadata iterator to fetch pages if necessary. + raw_iter = self._fetch_metadata_iter(api_key, cache, retries, environ) + + # Concatenate all frame in iterator to compute a single metadata frame. + with maybe_show_progress( + raw_iter, + show_progress, + label='Fetching symbol metadata', + item_show_func=item_show_count(), + length=3, + show_percent=False, + ) as blocks: + metadata = pd.concat(blocks, ignore_index=True) + + return metadata + + def _fetch_metadata_iter(self, api_key, cache, retries, environ): + for page_number in count(1): + # Attempt to load metadata page from cache. If it does not exist, + # poll the API upto `retries` times in order to get raw DataFrame. + key = 'metadata-page-{pn}.frame'.format(pn=page_number) + try: + raw = cache[key] + except KeyError: + for _ in range(retries): + try: + raw = self.fetch_raw_metadata_frame( + api_key, + page_number, + ) + break + except ValueError as e: + raw = pd.DataFrame([]) + break + except Exception as e: + log.exception( + 'Failed to load metadata from {}. ' + 'Retrying.'.format(self.name) + ) + else: + raise ValueError( + 'Failed to download metadata page {} after {} ' + 'attempts.'.format(page_number, retries) + ) + + + if raw.empty: + # Empty DataFrame signals completion. + break + + # Apply selective asset filtering, useful for benchmark + # ingestion. + if self._asset_filter: + raw = raw[raw.symbol.isin(self._asset_filter)] + + # Update cached value for key. + cache[key] = raw + + # Return metadata frame to application. + yield raw + + def _post_process_metadata(self, metadata, cache, show_progress=False): + # Create empty data frame using target metadata column names and dtypes + final_metadata = pd.DataFrame( + columns=self.md_column_names, + index=metadata.index, + ) + + # Iterate over the available symbols, loading the asset's raw symbol + # data from the cache. The final metadata is computed and recorded in + # the appropriate row depending on the asset's id. + with maybe_show_progress( + metadata.symbol.iteritems(), + show_progress, + label='Post-processing symbol metadata', + item_show_func=item_show_count(len(metadata)), + length=len(metadata), + show_percent=False, + ) as symbols_map: + for asset_id, symbol in symbols_map: + # Attempt to load data from disk, the cache should have an entry + # for each symbol at this point of the execution. If one does + # not exist, we should fail. + key = '{sym}.daily.frame'.format(sym=symbol) + try: + raw_data = cache[key] + except KeyError: + raise ValueError( + 'Unable to find cached data for symbol: {0}'.format(symbol) + ) + + # Perform and require post-processing of metadata. + final_symbol_metadata = self.post_process_symbol_metadata( + asset_id, + metadata.iloc[asset_id], + raw_data, + ) + + # Record symbol's final metadata. + final_metadata.iloc[asset_id] = final_symbol_metadata + + # Register all assets with the bundle's default exchange. + final_metadata['exchange'] = self.exchange + + return final_metadata + + def _fetch_symbol_iter(self, + api_key, + cache, + symbol_map, + calendar, + start_session, + end_session, + data_frequency, + retries): + + for asset_id, symbol in symbol_map.iteritems(): + # Record start time of iteration, compare at end of iteration to + # adhere to the datas source's rate limit policy. + start_time = pd.Timestamp.utcnow() + + # Fetch new data if cached data is absent or stale, otherwise + # returns the cached data unaltered. The `should_sleep` flag + # indicates that an API call was attempted, and that we should be + # ensure aren't exceeding our rate limit before proceeding to the + # next symbol. If the raw_data is updated, it is cached before being + # returned. + raw_data, should_sleep = self._maybe_update_symbol_frame( + start_time, + api_key, + cache, + symbol, + calendar, + start_session, + end_session, + data_frequency, + retries, + ) + + # TODO(cfromknecht) further data validation? + + # Pass asset_id and symbol data to writer. + yield asset_id, raw_data + + # If an API call was made during this iteration and the time to + # reach this point was less than the inter-request `wait_time`, + # sleep until after enough time has elapsed to prevent getting rate + # limited. + if should_sleep: + remaining = pd.Timestamp.utcnow() - start_time + self.wait_time + if remaining.value > 0: + sleep(remaining.value / 10**9) + + def _maybe_update_symbol_frame(self, + start_time, + api_key, + cache, + symbol, + calendar, + start_session, + end_session, + data_frequency, + retries): + + # Attempt to load pre-existing symbol data from cache. + key = '{sym}.{freq}.frame'.format(sym=symbol, freq=data_frequency) + try: + raw_data = cache[key] + except KeyError: + raw_data = None + + # Select the most recent date in cached dataset if it exists, + # otherwise use the provided `start_session`. + last = start_session + if raw_data is not None and len(raw_data) > 0: + last = raw_data.index[-1].tz_localize('UTC') + + should_sleep = False + + # Determine time at which cached data will be considered stale. + cache_expiration = last + pd.Timedelta(days=2) + if start_time <= cache_expiration and raw_data is not None: + # Data is fresh enough to reuse, no need to update. Iterator can + # proceed to next symbol directly since no API call was required. + return raw_data, should_sleep + + # If we arrive here, we must have attempted an API call. + # Setting this flag tells the iterator to pause before starting + # the next asset, that we don't exceed the data source's rate + # limit. + should_sleep = True + + raw_data = self._fetch_symbol_frame( + api_key, + symbol, + calendar, + start_session, + end_session, + data_frequency, + retries=retries, + ) + + # Cache latest symbol data. + cache[key] = raw_data + + return raw_data, should_sleep + + def _fetch_symbol_frame(self, + api_key, + symbol, + calendar, + start_session, + end_session, + data_frequency, + retries=DEFAULT_RETRIES): + + # Data for symbol is old enough to attempt an update or is not + # present in the cache. Fetch raw data for a single symbol + # with requested intervals and frequency. Retry as necessary. + for _ in range(retries): + try: + raw_data = self.fetch_raw_symbol_frame( + api_key, + symbol, + calendar, + start_session, + end_session, + data_frequency, + ) + raw_data.index = pd.to_datetime(raw_data.index, utc=True) + raw_data.index = raw_data.index.tz_localize('UTC') + + # Filter incoming data to fit start and end sessions. + raw_data = raw_data[ + (raw_data.index >= start_session) & + (raw_data.index <= end_session) + ] + + # Filter out any duplicates entries, keep last one, since + # previous frame is probably an incomplete. + raw_data = raw_data[~raw_data.index.duplicated(keep='last')] + + return raw_data + + except Exception as e: + log.exception( + 'Exception raised fetching {name} data. Retrying.' + .format(name=self.name) + ) + else: + raise ValueError( + 'Failed to download data for symbol {sym} ' + 'after {n} attempts.'.format( + sym=symbol, + n=retries, + ) + ) + + +def _dtypes_to_cols(dtypes): + return [name for name, _ in dtypes] diff --git a/catalyst/data/bundles/base_pricing.py b/catalyst/data/bundles/base_pricing.py new file mode 100644 index 00000000..a0abd51a --- /dev/null +++ b/catalyst/data/bundles/base_pricing.py @@ -0,0 +1,80 @@ +# +# Copyright 2017 Enigma MPC, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from catalyst.data.bundles.base import BaseBundle +from catalyst.utils.memoize import lazyval + +class BasePricingBundle(BaseBundle): + @lazyval + def md_dtypes(self): + return [ + ('symbol', 'object'), + ('start_date', 'datetime64[ns]'), + ('end_date', 'datetime64[ns]'), + ('ac_date', 'datetime64[ns]'), + ] + + @lazyval + def dtypes(self): + return [ + ('date', 'datetime64[ns]'), + ('open', 'float64'), + ('high', 'float64'), + ('low', 'float64'), + ('close', 'float64'), + ('volume', 'float64'), + ] + +class BaseCryptoPricingBundle(BasePricingBundle): + @lazyval + def calendar_name(self): + return 'OPEN' + + @lazyval + def minutes_per_day(self): + return 1440 + + @lazyval + def five_minutes_per_day(self): + return 288 + + @property + def splits(self): + return [] + + @property + def dividends(self): + return [] + +class BaseEquityPricingBundle(BasePricingBundle): + @lazyval + def calendar_name(self): + return 'NYSE' + + @lazyval + def minutes_per_day(self): + return 390 + + @lazyval + def five_minutes_per_day(self): + return 78 + + @property + def splits(self): + return self._splits + + @property + def dividends(self): + return self._dividends diff --git a/catalyst/data/bundles/core.py b/catalyst/data/bundles/core.py index c3009f1d..29aceb7a 100644 --- a/catalyst/data/bundles/core.py +++ b/catalyst/data/bundles/core.py @@ -17,6 +17,10 @@ from ..us_equity_pricing import ( SQLiteAdjustmentReader, SQLiteAdjustmentWriter, ) +from ..five_minute_bars import ( + BcolzFiveMinuteBarReader, + BcolzFiveMinuteBarWriter, +) from ..minute_bars import ( BcolzMinuteBarReader, BcolzMinuteBarWriter, @@ -33,6 +37,7 @@ from catalyst.utils.input_validation import ensure_timestamp, optionally import catalyst.utils.paths as pth from catalyst.utils.preprocess import preprocess from catalyst.utils.calendars import get_calendar +from catalyst.utils.cli import maybe_show_progress ONE_MEGABYTE = 1024 * 1024 @@ -43,16 +48,21 @@ def asset_db_path(bundle_name, timestr, environ=None, db_version=None): ) -def minute_equity_path(bundle_name, timestr, environ=None): +def minute_path(bundle_name, timestr, environ=None): return pth.data_path( - minute_equity_relative(bundle_name, timestr, environ), + minute_relative(bundle_name, timestr, environ), environ=environ, ) - -def daily_equity_path(bundle_name, timestr, environ=None): +def five_minute_path(bundle_name, timestr, environ=None): return pth.data_path( - daily_equity_relative(bundle_name, timestr, environ), + five_minute_relative(bundle_name, timestr, environ), + environ=environ, + ) + +def daily_path(bundle_name, timestr, environ=None): + return pth.data_path( + daily_relative(bundle_name, timestr, environ), environ=environ, ) @@ -79,11 +89,13 @@ def cache_relative(bundle_name, timestr, environ=None): return bundle_name, '.cache' -def daily_equity_relative(bundle_name, timestr, environ=None): +def daily_relative(bundle_name, timestr, environ=None): return bundle_name, timestr, 'daily_equities.bcolz' +def five_minute_relative(bundle_name, timestr, environ=None): + return bundle_name, timestr, 'five_minute.bcolz' -def minute_equity_relative(bundle_name, timestr, environ=None): +def minute_relative(bundle_name, timestr, environ=None): return bundle_name, timestr, 'minute_equities.bcolz' @@ -158,7 +170,9 @@ def download_with_progress(url, chunk_size, **progress_kwargs): total_size = int(resp.headers['content-length']) data = BytesIO() - with click.progressbar(length=total_size, **progress_kwargs) as pbar: + + progress_kwargs['length'] = total_size + with maybe_show_progress(None, True, **progress_kwargs) as pbar: for chunk in resp.iter_content(chunk_size=chunk_size): data.write(chunk) pbar.update(len(chunk)) @@ -192,19 +206,20 @@ RegisteredBundle = namedtuple( 'start_session', 'end_session', 'minutes_per_day', + 'five_minutes_per_day', 'ingest', 'create_writers'] ) BundleData = namedtuple( 'BundleData', - 'asset_finder equity_minute_bar_reader equity_daily_bar_reader ' + 'asset_finder minute_bar_reader five_minute_bar_reader daily_bar_reader ' 'adjustment_reader', ) BundleCore = namedtuple( 'BundleCore', - 'bundles register unregister ingest load clean', + 'bundles register_bundle register unregister ingest load clean', ) @@ -258,6 +273,8 @@ def _make_bundle_core(): ------- bundles : mappingproxy The mapping of bundles to bundle payloads. + register_bundle : Bundle + A bundle instance to add to the ``bundles`` mapping. register : callable The function which registers new bundles in the ``bundles`` mapping. unregister : callable @@ -275,13 +292,31 @@ def _make_bundle_core(): # warn when trampling another bundle. bundles = mappingproxy(_bundles) + def register_bundle(bundle_cls, + asset_filter=None, + start_session=None, + end_session=None, + create_writers=True): + bundle = bundle_cls(asset_filter=asset_filter) + return register( + bundle.name, + bundle.ingest, + calendar_name=bundle.calendar_name, + minutes_per_day=bundle.minutes_per_day, + five_minutes_per_day=bundle.five_minutes_per_day, + start_session=start_session, + end_session=end_session, + create_writers=create_writers, + ) + @curry def register(name, f, - calendar_name='NYSE', + calendar_name='OPEN', start_session=None, end_session=None, - minutes_per_day=390, + minutes_per_day=1440, + five_minutes_per_day=288, create_writers=True): """Register a data bundle ingest function. @@ -362,6 +397,7 @@ def _make_bundle_core(): start_session=start_session, end_session=end_session, minutes_per_day=minutes_per_day, + five_minutes_per_day=five_minutes_per_day, ingest=f, create_writers=create_writers, ) @@ -393,7 +429,8 @@ def _make_bundle_core(): environ=os.environ, timestamp=None, assets_versions=(), - show_progress=False): + show_progress=False, + is_compile=False): """Ingest data for a given bundle. Parameters @@ -443,7 +480,7 @@ def _make_bundle_core(): pth.data_path([], environ=environ)) ) daily_bars_path = wd.ensure_dir( - *daily_equity_relative( + *daily_relative( name, timestr, environ=environ, ) ) @@ -457,10 +494,20 @@ def _make_bundle_core(): # when we create the SQLiteAdjustmentWriter below. The # SQLiteAdjustmentWriter needs to open the daily ctables so # that it can compute the adjustment ratios for the dividends. - daily_bar_writer.write(()) + + five_minute_bar_writer = BcolzFiveMinuteBarWriter( + wd.ensure_dir(*five_minute_relative( + name, timestr, environ=environ) + ), + calendar, + start_session, + end_session, + five_minutes_per_day=bundle.five_minutes_per_day, + ) + minute_bar_writer = BcolzMinuteBarWriter( - wd.ensure_dir(*minute_equity_relative( + wd.ensure_dir(*minute_relative( name, timestr, environ=environ) ), calendar, @@ -468,6 +515,7 @@ def _make_bundle_core(): end_session, minutes_per_day=bundle.minutes_per_day, ) + assets_db_path = wd.getpath(*asset_db_relative( name, timestr, environ=environ, )) @@ -484,6 +532,7 @@ def _make_bundle_core(): ) else: daily_bar_writer = None + five_minute_bar_writer = None minute_bar_writer = None asset_db_writer = None adjustment_db_writer = None @@ -495,6 +544,7 @@ def _make_bundle_core(): environ, asset_db_writer, minute_bar_writer, + five_minute_bar_writer, daily_bar_writer, adjustment_db_writer, calendar, @@ -502,6 +552,7 @@ def _make_bundle_core(): end_session, cache, show_progress, + is_compile, pth.data_path([name, timestr], environ=environ), ) @@ -577,11 +628,14 @@ def _make_bundle_core(): asset_finder=AssetFinder( asset_db_path(name, timestr, environ=environ), ), - equity_minute_bar_reader=BcolzMinuteBarReader( - minute_equity_path(name, timestr, environ=environ), + minute_bar_reader=BcolzMinuteBarReader( + minute_path(name, timestr, environ=environ), ), - equity_daily_bar_reader=BcolzDailyBarReader( - daily_equity_path(name, timestr, environ=environ), + five_minute_bar_reader=BcolzFiveMinuteBarReader( + five_minute_path(name, timestr, environ=environ), + ), + daily_bar_reader=BcolzDailyBarReader( + daily_path(name, timestr, environ=environ), ), adjustment_reader=SQLiteAdjustmentReader( adjustment_db_path(name, timestr, environ=environ), @@ -670,7 +724,15 @@ def _make_bundle_core(): return cleaned - return BundleCore(bundles, register, unregister, ingest, load, clean) + return BundleCore( + bundles, + register_bundle, + register, + unregister, + ingest, + load, + clean, + ) -bundles, register, unregister, ingest, load, clean = _make_bundle_core() +bundles, register_bundle, register, unregister, ingest, load, clean = _make_bundle_core() diff --git a/catalyst/data/bundles/poloniex.py b/catalyst/data/bundles/poloniex.py index 3303a7a8..430da87c 100644 --- a/catalyst/data/bundles/poloniex.py +++ b/catalyst/data/bundles/poloniex.py @@ -1,38 +1,167 @@ -from io import BytesIO -import tarfile +# +# Copyright 2017 Enigma MPC, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. -from . import core as bundles +from datetime import datetime -POLONIEX_BUNDLE_URL = ( - 'https://www.dropbox.com/s/9naqffawnq8o4r2/poloniex-bundle.tar?dl=1' -) +import pandas as pd -@bundles.register( - 'poloniex', - create_writers=False, - calendar_name='OPEN', - minutes_per_day=1440) -def quantopian_quandl_bundle(environ, - asset_db_writer, - minute_bar_writer, - daily_bar_writer, - adjustment_writer, - calendar, - start_session, - end_session, - cache, - show_progress, - output_dir): - if show_progress: - data = bundles.download_with_progress( - POLONIEX_BUNDLE_URL, - chunk_size=bundles.ONE_MEGABYTE, - label="Downloading Bundle: poloniex", +from six.moves.urllib.parse import urlencode + +from catalyst.data.bundles.core import register_bundle +from catalyst.data.bundles.base_pricing import BaseCryptoPricingBundle +from catalyst.utils.memoize import lazyval + +class PoloniexBundle(BaseCryptoPricingBundle): + @lazyval + def name(self): + return 'poloniex' + + @lazyval + def exchange(self): + return 'POLO' + + @lazyval + def frequencies(self): + return set(( + 'daily', + #'5-minute', + )) + + @lazyval + def tar_url(self): + return ( + 'https://www.dropbox.com/s/9naqffawnq8o4r2/' + 'poloniex-bundle.tar?dl=1' ) - else: - data = bundles.download_without_progress(POLONIEX_BUNDLE_URL) - with tarfile.open('r', fileobj=data) as tar: - if show_progress: - print("Writing data to %s." % output_dir) - tar.extractall(output_dir) + @lazyval + def wait_time(self): + return pd.Timedelta(milliseconds=170) + + def fetch_raw_metadata_frame(self, api_key, page_number): + if page_number > 1: + return pd.DataFrame([]) + + raw = pd.read_json( + self._format_metadata_url( + api_key, + page_number, + ), + orient='index', + ) + + raw = raw.sort_index().reset_index() + raw.rename( + columns={'index':'symbol'}, + inplace=True, + ) + + raw = raw[raw['isFrozen'] == 0] + + return raw + + def post_process_symbol_metadata(self, asset_id, sym_md, sym_data): + start_date = sym_data.index[0] + end_date = sym_data.index[-1] + ac_date = end_date + pd.Timedelta(days=1) + + return ( + sym_md.symbol, + start_date, + end_date, + ac_date, + ) + + def fetch_raw_symbol_frame(self, + api_key, + symbol, + calendar, + start_date, + end_date, + frequency): + raw = pd.read_json( + self._format_data_url( + api_key, + symbol, + start_date, + end_date, + frequency, + ), + orient='records', + ) + raw.set_index('date', inplace=True) + + scale = 1 + raw.loc[:, 'open'] /= scale + raw.loc[:, 'high'] /= scale + raw.loc[:, 'low'] /= scale + raw.loc[:, 'close'] /= scale + raw.loc[:, 'volume'] *= scale + + return raw + + ''' + HELPER METHODS + ''' + + def _format_metadata_url(self, api_key, page_number): + query_params = [ + ('command', 'returnTicker'), + ] + + return self._format_polo_query(query_params) + + + def _format_data_url(self, + api_key, + symbol, + start_date, + end_date, + data_frequency): + period_map = { + 'daily': 86400, +# '5-minute': 300, + } + + try: + period = period_map[data_frequency] + except KeyError: + return None + + query_params = [ + ('command', 'returnChartData'), + ('currencyPair', symbol), + ('start', start_date.value / 10**9), + ('end', end_date.value / 10**9), + ('period', period), + ] + + return self._format_polo_query(query_params) + + def _format_polo_query(self, query_params): + return 'https://poloniex.com/public?{query}'.format( + query=urlencode(query_params), + ) + +''' +As a second parameter, you can pass an array of currency pairs +that will be processed as an asset_filter to only process that +subset of assets in the bundle, such as: +register_bundle(PoloniexBundle, ['USDT_BTC',]) + +For a production environment make sure to use (to bundle all pairs): +register_bundle(PoloniexBundle) +''' +register_bundle(PoloniexBundle) diff --git a/catalyst/data/bundles/quandl.py b/catalyst/data/bundles/quandl.py index 5a3a9dae..fc7a40c7 100644 --- a/catalyst/data/bundles/quandl.py +++ b/catalyst/data/bundles/quandl.py @@ -1,3 +1,28 @@ +# +# Copyright 2017 Enigma MPC, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +import pandas as pd + +from six.moves.urllib.parse import urlencode + +from catalyst.data.bundles.core import register_bundle +from catalyst.data.bundles.base_pricing import BaseEquityPricingBundle +from catalyst.utils.memoize import lazyval + """ Module for building a complete daily dataset from Quandl's WIKI dataset. """ @@ -17,350 +42,190 @@ from . import core as bundles log = Logger(__name__) seconds_per_call = (pd.Timedelta('10 minutes') / 2000).total_seconds() -# Invalid symbols that quandl has had in its metadata: -excluded_symbols = frozenset({'TEST123456789'}) +class QuandlBundle(BaseEquityPricingBundle): + @lazyval + def name(self): + return 'quandl' -def _fetch_raw_metadata(api_key, cache, retries, environ): - """Generator that yields each page of data from the metadata endpoint - as a dataframe. - """ - for page_number in count(1): - key = 'metadata-page-%d' % page_number - try: - raw = cache[key] - except KeyError: - for _ in range(retries): - try: - raw = pd.read_csv( - format_metadata_url(api_key, page_number), - date_parser=pd.tseries.tools.to_datetime, - parse_dates=[ - 'oldest_available_date', - 'newest_available_date', - ], - dtypes={ - 'dataset_code': 'int', - 'name': 'str', - 'oldest_available_date': 'str', - 'newest_available_date': 'str', - }, - usecols=[ - 'dataset_code', - 'name', - 'oldest_available_date', - 'newest_available_date', - ], - ) - break - except ValueError: - # when we are past the last page we will get a value - # error because there will be no columns - raw = pd.DataFrame([]) - break - except Exception: - pass - else: - raise ValueError( - 'Failed to download metadata page %d after %d' - ' attempts.' % (page_number, retries), - ) + @lazyval + def exchange(self): + return 'QUANDL' - cache[key] = raw + @lazyval + def frequencies(self): + return set(('daily',)) - if raw.empty: - # use the empty dataframe to signal completion - break - yield raw + @lazyval + def tar_url(self): + return 'https://s3.amazonaws.com/quantopian-public-zipline-data/quandl' + @lazyval + def wait_time(self): + return pd.Timedelta(milliseconds=300) -def fetch_symbol_metadata_frame(api_key, - cache, - retries=5, - environ=None, - show_progress=False): - """ - Download Quandl symbol metadata. + @lazyval + def _excluded_symbols(self): + """ + Invalid symbols that quandl has had in its metadata: + """ + return frozenset({'TEST123456789'}) - Parameters - ---------- - api_key : str - The quandl api key to use. If this is None then no api key will be - sent. - cache : DataFrameCache - The cache to use for persisting the intermediate data. - retries : int, optional - The number of times to retry each request before failing. - environ : mapping[str -> str], optional - The environment to use to find the catalyst home. By default this - is ``os.environ``. - show_progress : bool, optional - Show a progress bar for the download of this data. - - Returns - ------- - metadata_frame : pd.DataFrame - A dataframe with the following columns: - symbol: the asset's symbol - name: the full name of the asset - start_date: the first date of data for this asset - end_date: the last date of data for this asset - auto_close_date: end_date + one day - exchange: the exchange for the asset; this is always 'quandl' - The index of the dataframe will be used for symbol->sid mappings but - otherwise does not have specific meaning. - """ - raw_iter = _fetch_raw_metadata(api_key, cache, retries, environ) - - def item_show_func(_, _it=iter(count())): - 'Downloading page: %d' % next(_it) - - with maybe_show_progress(raw_iter, - show_progress, - item_show_func=item_show_func, - label='Downloading WIKI metadata: ') as blocks: - data = pd.concat(blocks, ignore_index=True).rename(columns={ - 'dataset_code': 'symbol', - 'name': 'asset_name', - 'oldest_available_date': 'start_date', - 'newest_available_date': 'end_date', - }).sort_values('symbol') - - data = data[~data.symbol.isin(excluded_symbols)] - # cut out all the other stuff in the name column - # we need to escape the paren because it is actually splitting on a regex - data.asset_name = data.asset_name.str.split(r' \(', 1).str.get(0) - data['exchange'] = 'QUANDL' - - data['start_date'] = data['start_date'].astype(datetime) - data['end_date'] = data['end_date'].astype(datetime) - - data['auto_close_date'] = data['end_date'] + pd.Timedelta(days=1) - return data - - -def format_metadata_url(api_key, page_number): - """Build the query RL for the quandl WIKI metadata. - """ - query_params = [ - ('per_page', '100'), - ('sort_by', 'id'), - ('page', str(page_number)), - ('database_code', 'WIKI'), - ] - if api_key is not None: - query_params = [('api_key', api_key)] + query_params - return ( - 'https://www.quandl.com/api/v3/datasets.csv?' + urlencode(query_params) - ) - - -def format_wiki_url(api_key, symbol, start_date, end_date): - """ - Build a query URL for a quandl WIKI dataset. - """ - query_params = [ - ('start_date', start_date.strftime('%Y-%m-%d')), - ('end_date', end_date.strftime('%Y-%m-%d')), - ('order', 'asc'), - ] - if api_key is not None: - query_params = [('api_key', api_key)] + query_params - - return ( - "https://www.quandl.com/api/v3/datasets/WIKI/" - "{symbol}.csv?{query}".format( - symbol=symbol, - query=urlencode(query_params), - ) - ) - - -def fetch_single_equity(api_key, - symbol, - start_date, - end_date, - retries=5): - """ - Download data for a single equity. - """ - for _ in range(retries): - try: - return pd.read_csv( - format_wiki_url(api_key, symbol, start_date, end_date), - parse_dates=['Date'], - index_col='Date', - usecols=[ - 'Open', - 'High', - 'Low', - 'Close', - 'Volume', - 'Date', - 'Ex-Dividend', - 'Split Ratio', - ], - na_values=['NA'], - ).rename(columns={ - 'Open': 'open', - 'High': 'high', - 'Low': 'low', - 'Close': 'close', - 'Volume': 'volume', - 'Date': 'date', - 'Ex-Dividend': 'ex_dividend', - 'Split Ratio': 'split_ratio', - }) - except Exception: - log.exception("Exception raised reading Quandl data. Retrying.") - else: - raise ValueError( - "Failed to download data for %r after %d attempts." % ( - symbol, retries - ) + def fetch_raw_metadata_frame(self, api_key, page_number): + raw = pd.read_csv( + self._format_metadata_url(api_key, page_number), + date_parser=pd.tseries.tools.to_datetime, + parse_dates=[ + 'oldest_available_date', + 'newest_available_date', + ], + dtype={ + 'dataset_code': 'str', + 'name': 'str', + 'oldest_available_date': 'str', + 'newest_available_date': 'str', + }, + usecols=[ + 'dataset_code', + 'name', + 'oldest_available_date', + 'newest_available_date', + ], + ).rename( + columns={ + 'dataset_code': 'symbol', + 'name': 'asset_name', + 'oldest_available_date': 'start_date', + 'newest_available_date': 'end_date', + }, ) + raw['start_date'] = raw['start_date'].astype(datetime) + raw['end_date'] = raw['end_date'].astype(datetime) + raw['ac_date'] = raw['end_date'] + pd.Timedelta(days=1) -def _update_splits(splits, asset_id, raw_data): - split_ratios = raw_data.split_ratio - df = pd.DataFrame({'ratio': 1 / split_ratios[split_ratios != 1]}) - df.index.name = 'effective_date' - df.reset_index(inplace=True) - df['sid'] = asset_id - splits.append(df) + # Filter out invalid symbols + raw = raw[~raw.symbol.isin(self._excluded_symbols)] + # cut out all the other stuff in the name column + # we need to escape the paren because it is actually splitting on a regex + raw.asset_name = raw.asset_name.str.split(r' \(', 1).str.get(0) -def _update_dividends(dividends, asset_id, raw_data): - divs = raw_data.ex_dividend - df = pd.DataFrame({'amount': divs[divs != 0]}) - df.index.name = 'ex_date' - df.reset_index(inplace=True) - df['sid'] = asset_id - # we do not have this data in the WIKI dataset - df['record_date'] = df['declared_date'] = df['pay_date'] = pd.NaT - dividends.append(df) + return raw - -def gen_symbol_data(api_key, - cache, - symbol_map, - calendar, - start_session, - end_session, - splits, - dividends, - retries): - for asset_id, symbol in symbol_map.iteritems(): - start_time = time() - try: - # see if we have this data cached. - raw_data = cache[symbol] - should_sleep = False - except KeyError: - # we need to fetch the data and then write it to our cache - raw_data = cache[symbol] = fetch_single_equity( + def fetch_raw_symbol_frame(self, + api_key, + symbol, + calendar, + start_session, + end_session, + data_frequency): + raw_data = pd.read_csv( + self._format_wiki_url( api_key, symbol, - start_date=start_session, - end_date=end_session, - ) - should_sleep = True - - _update_splits(splits, asset_id, raw_data) - _update_dividends(dividends, asset_id, raw_data) + start_session, + end_session, + data_frequency, + ), + parse_dates=['Date'], + index_col='Date', + usecols=[ + 'Open', + 'High', + 'Low', + 'Close', + 'Volume', + 'Date', + 'Ex-Dividend', + 'Split Ratio', + ], + na_values=['NA'], + ).rename(columns={ + 'Open': 'open', + 'High': 'high', + 'Low': 'low', + 'Close': 'close', + 'Volume': 'volume', + 'Date': 'date', + 'Ex-Dividend': 'ex_dividend', + 'Split Ratio': 'split_ratio', + }) sessions = calendar.sessions_in_range(start_session, end_session) - raw_data = raw_data.reindex( + return raw_data.reindex( sessions.tz_localize(None), copy=False, ).fillna(0.0) - yield asset_id, raw_data - if should_sleep: - remaining = seconds_per_call - time() - start_time - if remaining > 0: - sleep(remaining) + def post_process_symbol_metadata(self, asset_id, sym_md, sym_data): + self._update_splits(asset_id, sym_data) + self._update_dividends(asset_id, sym_data) + + return sym_md + + def _update_splits(self, asset_id, raw_data): + split_ratios = raw_data.split_ratio + df = pd.DataFrame({'ratio': 1 / split_ratios[split_ratios != 1]}) + df.index.name = 'effective_date' + df.reset_index(inplace=True) + df['sid'] = asset_id + self.splits.append(df) -@bundles.register('quandl') -def quandl_bundle(environ, - asset_db_writer, - minute_bar_writer, - daily_bar_writer, - adjustment_writer, - calendar, - start_session, - end_session, - cache, - show_progress, - output_dir): - """Build a catalyst data bundle from the Quandl WIKI dataset. - """ - api_key = environ.get('QUANDL_API_KEY') - metadata = fetch_symbol_metadata_frame( - api_key, - cache=cache, - show_progress=show_progress, - ) - symbol_map = metadata.symbol - - # data we will collect in `gen_symbol_data` - splits = [] - dividends = [] - - asset_db_writer.write(metadata) - daily_bar_writer.write( - gen_symbol_data( - api_key, - cache, - symbol_map, - calendar, - start_session, - end_session, - splits, - dividends, - environ.get('QUANDL_DOWNLOAD_ATTEMPTS', 5), - ), - assets=metadata.index, - show_progress=show_progress, - ) - adjustment_writer.write( - splits=pd.concat(splits, ignore_index=True), - dividends=pd.concat(dividends, ignore_index=True), - ) + def _update_dividends(self, asset_id, raw_data): + divs = raw_data.ex_dividend + df = pd.DataFrame({'amount': divs[divs != 0]}) + df.index.name = 'ex_date' + df.reset_index(inplace=True) + df['sid'] = asset_id + # we do not have this data in the WIKI dataset + df['record_date'] = df['declared_date'] = df['pay_date'] = pd.NaT + self.dividends.append(df) -QUANTOPIAN_QUANDL_URL = ( - 'https://s3.amazonaws.com/quantopian-public-zipline-data/quandl' -) + def _format_metadata_url(self, api_key, page_number): + """Build the query RL for the quandl WIKI metadata. + """ + query_params = [ + ('per_page', '100'), + ('sort_by', 'id'), + ('page', str(page_number)), + ('database_code', 'WIKI'), + ] + if api_key is not None: + query_params = [('api_key', api_key)] + query_params - -@bundles.register('quantopian-quandl', create_writers=False) -def quantopian_quandl_bundle(environ, - asset_db_writer, - minute_bar_writer, - daily_bar_writer, - adjustment_writer, - calendar, - start_session, - end_session, - cache, - show_progress, - output_dir): - if show_progress: - data = bundles.download_with_progress( - QUANTOPIAN_QUANDL_URL, - chunk_size=bundles.ONE_MEGABYTE, - label="Downloading Bundle: quantopian-quandl", + return ( + 'https://www.quandl.com/api/v3/datasets.csv?' + urlencode(query_params) ) - else: - data = bundles.download_without_progress(QUANTOPIAN_QUANDL_URL) - - with tarfile.open('r', fileobj=data) as tar: - if show_progress: - print("Writing data to %s." % output_dir) - tar.extractall(output_dir) -register_calendar_alias("QUANDL", "NYSE") + def _format_wiki_url(self, + api_key, + symbol, + start_date, + end_date, + data_frequency): + """ + Build a query URL for a quandl WIKI dataset. + """ + query_params = [ + ('start_date', start_date.strftime('%Y-%m-%d')), + ('end_date', end_date.strftime('%Y-%m-%d')), + ('order', 'asc'), + ] + if api_key is not None: + query_params = [('api_key', api_key)] + query_params + + return ( + "https://www.quandl.com/api/v3/datasets/WIKI/" + "{symbol}.csv?{query}".format( + symbol=symbol, + query=urlencode(query_params), + ) + ) + +register_calendar_alias('QUANDL', 'NYSE') +register_bundle(QuandlBundle) diff --git a/catalyst/data/data_portal.py b/catalyst/data/data_portal.py index 9049f337..69aa166e 100644 --- a/catalyst/data/data_portal.py +++ b/catalyst/data/data_portal.py @@ -42,6 +42,7 @@ from catalyst.assets.roll_finder import ( ) from catalyst.data.dispatch_bar_reader import ( AssetDispatchMinuteBarReader, + AssetDispatchFiveMinuteBarReader, AssetDispatchSessionBarReader ) from catalyst.data.resample import ( @@ -114,12 +115,16 @@ class DataPortal(object): The calendar instance used to provide minute->session information. first_trading_day : pd.Timestamp The first trading day for the simulation. - equity_daily_reader : BcolzDailyBarReader, optional + daily_reader : BcolzDailyBarReader, optional The daily bar reader for equities. This will be used to service daily data backtests or daily history calls in a minute backetest. If a daily bar reader is not provided but a minute bar reader is, the minutes will be rolled up to serve the daily requests. - equity_minute_reader : BcolzMinuteBarReader, optional + five_minute_reader : BcolzFiveMinuteBarReader, optional + The five minute bar reader for equities. This will be used to service + 5-minute data backtests or five-minute history calls. This can be used + to serve daily calls if no daily bar reader is provided. + minute_reader : BcolzMinuteBarReader, optional The minute bar reader for equities. This will be used to service minute data backtests or minute history calls. This can be used to serve daily calls if no daily bar reader is provided. @@ -144,8 +149,9 @@ class DataPortal(object): asset_finder, trading_calendar, first_trading_day, - equity_daily_reader=None, - equity_minute_reader=None, + daily_reader=None, + five_minute_reader=None, + minute_reader=None, future_daily_reader=None, future_minute_reader=None, adjustment_reader=None, @@ -180,7 +186,7 @@ class DataPortal(object): # Infer the last session from the provided readers. last_sessions = [ reader.last_available_dt - for reader in [equity_daily_reader, future_daily_reader] + for reader in [daily_reader, future_daily_reader] if reader is not None ] if last_sessions: @@ -194,7 +200,11 @@ class DataPortal(object): # Infer the last minute from the provided readers. last_minutes = [ reader.last_available_dt - for reader in [equity_minute_reader, future_minute_reader] + for reader in [ + minute_reader, + five_minute_reader, + future_minute_reader, + ] if reader is not None ] if last_minutes: @@ -202,10 +212,12 @@ class DataPortal(object): else: self._last_available_minute = None - aligned_equity_minute_reader = self._ensure_reader_aligned( - equity_minute_reader) - aligned_equity_session_reader = self._ensure_reader_aligned( - equity_daily_reader) + aligned_minute_reader = self._ensure_reader_aligned( + minute_reader) + aligned_five_minute_reader = self._ensure_reader_aligned( + five_minute_reader) + aligned_session_reader = self._ensure_reader_aligned( + daily_reader) aligned_future_minute_reader = self._ensure_reader_aligned( future_minute_reader) aligned_future_session_reader = self._ensure_reader_aligned( @@ -217,12 +229,15 @@ class DataPortal(object): } aligned_minute_readers = {} + aligned_five_minute_readers = {} aligned_session_readers = {} - if aligned_equity_minute_reader is not None: - aligned_minute_readers[Equity] = aligned_equity_minute_reader - if aligned_equity_session_reader is not None: - aligned_session_readers[Equity] = aligned_equity_session_reader + if aligned_minute_reader is not None: + aligned_minute_readers[Equity] = aligned_minute_reader + if aligned_five_minute_reader is not None: + aligned_five_minute_readers[Equity] = aligned_five_minute_reader + if aligned_session_reader is not None: + aligned_session_readers[Equity] = aligned_session_reader if aligned_future_minute_reader is not None: aligned_minute_readers[Future] = aligned_future_minute_reader @@ -252,6 +267,13 @@ class DataPortal(object): self._last_available_minute, ) + _dispatch_five_minute_reader = AssetDispatchFiveMinuteBarReader( + self.trading_calendar, + self.asset_finder, + aligned_five_minute_readers, + self._last_available_minute, + ) + _dispatch_session_reader = AssetDispatchSessionBarReader( self.trading_calendar, self.asset_finder, @@ -261,6 +283,7 @@ class DataPortal(object): self._pricing_readers = { 'minute': _dispatch_minute_reader, + '5-minute': _dispatch_five_minute_reader, 'daily': _dispatch_session_reader, } @@ -514,15 +537,17 @@ class DataPortal(object): ) else: if field == "last_traded": - return self.get_last_traded_dt(asset, dt, 'minute') + return self.get_last_traded_dt(asset, dt, data_frequency) elif field == "price": - return self._get_minute_spot_value( - asset, "close", dt, ffill=True, + return self._get_minutely_spot_value( + asset, "close", dt, data_frequency, ffill=True, ) elif field == "contract": return self._get_current_contract(asset, dt) else: - return self._get_minute_spot_value(asset, field, dt) + return self._get_minutely_spot_value( + asset, field, dt, data_frequency, + ) if assets_is_scalar: return get_single_asset_value(assets) @@ -648,8 +673,14 @@ class DataPortal(object): return spot_value - def _get_minute_spot_value(self, asset, column, dt, ffill=False): - reader = self._get_pricing_reader('minute') + def _get_minutely_spot_value(self, + asset, + column, + dt, + data_frequency, + ffill=False): + + reader = self._get_pricing_reader(data_frequency) if ffill: # If forward filling, we want the last minute with values (up to @@ -680,8 +711,32 @@ class DataPortal(object): # the value we found came from a different day, so we have to adjust # the data if there are any adjustments on that day barrier return self.get_adjusted_value( - asset, column, query_dt, - dt, "minute", spot_value=result + asset, + column, + query_dt, + dt, + data_frequency, + spot_value=result + ) + + + def _get_five_minute_spot_value(self, asset, column, dt, ffill=False): + return self._get_minutely_spot_value( + asset, + column, + dt, + ffill, + '5-minute', + ) + + + def _get_minute_spot_value(self, asset, column, dt, ffill=False): + return self._get_minutely_spot_value( + asset, + column, + dt, + ffill, + 'minute', ) def _get_daily_spot_value(self, asset, column, dt): diff --git a/catalyst/data/dispatch_bar_reader.py b/catalyst/data/dispatch_bar_reader.py index 42770e55..e545eef0 100644 --- a/catalyst/data/dispatch_bar_reader.py +++ b/catalyst/data/dispatch_bar_reader.py @@ -130,13 +130,17 @@ class AssetDispatchBarReader(with_metaclass(ABCMeta)): return results - class AssetDispatchMinuteBarReader(AssetDispatchBarReader): def _dt_window_size(self, start_dt, end_dt): return len(self.trading_calendar.minutes_in_range(start_dt, end_dt)) +class AssetDispatchFiveMinuteBarReader(AssetDispatchBarReader): + + def _dt_window_size(self, start_dt, end_dt): + return len(self.trading_calendar.five_minutes_in_range(start_dt, end_dt)) + class AssetDispatchSessionBarReader(AssetDispatchBarReader): def _dt_window_size(self, start_dt, end_dt): diff --git a/catalyst/data/five_minute_bars.py b/catalyst/data/five_minute_bars.py index b772c22b..9021dc0a 100644 --- a/catalyst/data/five_minute_bars.py +++ b/catalyst/data/five_minute_bars.py @@ -24,6 +24,10 @@ from bcolz import ctable from intervaltree import IntervalTree import logbook import numpy as np +from numpy import ( + iinfo, + uint64, +) import pandas as pd from pandas import HDFStore import tables @@ -31,31 +35,39 @@ from six import with_metaclass from toolz import keymap, valmap from catalyst.data._minute_bar_internal import ( - minute_value, - find_position_of_minute, - find_last_traded_position_internal + five_minute_value, + find_position_of_five_minute, + find_last_traded_five_minute_position_internal, ) from catalyst.gens.sim_engine import NANOS_IN_MINUTE from catalyst.data.bar_reader import BarReader, NoDataOnDate -from catalyst.data.us_equity_pricing import check_uint32_safe +from catalyst.data.us_equity_pricing import ( + winsorise_uint64, + check_uint64_safe, +) from catalyst.utils.calendars import get_calendar -from catalyst.utils.cli import maybe_show_progress +from catalyst.utils.cli import ( + item_show_count, + maybe_show_progress, +) from catalyst.utils.memoize import lazyval - logger = logbook.Logger('FiveMinuteBars') -CRYPTO_ASSETS_FIVE_MINUTES_PER_DAY = 288 -US_EQUITIES_MINUTES_PER_DAY = 390 -FUTURES_MINUTES_PER_DAY = 1440 +OPEN_FIVE_MINUTES_PER_DAY = 288 -DEFAULT_EXPECTEDLEN = US_EQUITIES_MINUTES_PER_DAY * 252 * 15 -DEFAULT_EXPECTED_CRYPTO_LEN = CRYPTO_ASSETS_FIVE_MINUTES_PER_DAY * 366 * 15 +DEFAULT_EXPECTEDLEN_CRYPTO = OPEN_FIVE_MINUTES_PER_DAY * 366 * 15 -OHLC_RATIO = 1000 +OHLC_RATIO = 1000000 +OHLC = frozenset(['open', 'high', 'low', 'close']) +OHLCV = frozenset(['open', 'high', 'low', 'close', 'volume']) + +UINT64_MAX = iinfo(uint64).max + +NANOS_IN_FIVE_MINUTES = 5 * NANOS_IN_MINUTE class BcolzFiveMinuteOverlappingData(Exception): pass @@ -68,13 +80,13 @@ class BcolzFiveMinuteWriterColumnMismatch(Exception): class FiveMinuteBarReader(BarReader): @property def data_frequency(self): - return "five-minute" + return "5-minute" def _calc_five_minute_index(market_opens, five_minutes_per_day): five_minutes = np.zeros(len(market_opens) * five_minutes_per_day, dtype='datetime64[ns]') - deltas = np.arange(0, five_minutes_per_day, dtype='timedelta64[m]') + deltas = 5 * np.arange(0, five_minutes_per_day, dtype='timedelta64[m]') for i, market_open in enumerate(market_opens): start = market_open.asm8 five_minute_values = start + deltas @@ -116,19 +128,19 @@ def _sid_subdir_path(sid): def convert_cols(cols, scale_factor, sid, invalid_data_behavior): - """Adapt OHLCV columns into uint32 columns. + """Adapt OHLCV columns into uint64 columns. Parameters ---------- cols : dict A dict mapping each column name (open, high, low, close, volume) - to a float column to convert to uint32. + to a float column to convert to uint64. scale_factor : int - Factor to use to scale float values before converting to uint32. + Factor to use to scale float values before converting to uint64. sid : int Sid of the relevant asset, for logging. invalid_data_behavior : str - Specifies behavior when data cannot be converted to uint32. + Specifies behavior when data cannot be converted to uint64. If 'raise', raises an exception. If 'warn', logs a warning and filters out incompatible values. If 'ignore', silently filters out incompatible values. @@ -137,6 +149,7 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior): scaled_highs = np.nan_to_num(cols['high']) * scale_factor scaled_lows = np.nan_to_num(cols['low']) * scale_factor scaled_closes = np.nan_to_num(cols['close']) * scale_factor + volumes = np.nan_to_num(cols['volume']) exclude_mask = np.zeros_like(scaled_opens, dtype=bool) @@ -145,11 +158,12 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior): ('high', scaled_highs), ('low', scaled_lows), ('close', scaled_closes), + ('volume', volumes), ]: max_val = scaled_col.max() try: - check_uint32_safe(max_val, col_name) + check_uint64_safe(max_val, col_name) except ValueError: if invalid_data_behavior == 'raise': raise @@ -157,20 +171,20 @@ def convert_cols(cols, scale_factor, sid, invalid_data_behavior): if invalid_data_behavior == 'warn': logger.warn( 'Values for sid={}, col={} contain some too large for ' - 'uint32 (max={}), filtering them out', + 'uint64 (max={}), filtering them out', sid, col_name, max_val, ) # We want to exclude all rows that have an unsafe value in # this column. - exclude_mask &= (scaled_col >= np.iinfo(np.uint32).max) + exclude_mask &= (scaled_col >= iinfo(uint64).max) - # Convert all cols to uint32. - opens = scaled_opens.astype(np.uint32) - highs = scaled_highs.astype(np.uint32) - lows = scaled_lows.astype(np.uint32) - closes = scaled_closes.astype(np.uint32) - volumes = cols['volume'].astype(np.uint32) + # Convert all cols to uint64. + opens = scaled_opens.astype(uint64) + highs = scaled_highs.astype(uint64) + lows = scaled_lows.astype(uint64) + closes = scaled_closes.astype(uint64) + volumes = volumes.astype(uint64) # Exclude rows with unsafe values by setting to zero. opens[exclude_mask] = 0 @@ -200,7 +214,7 @@ class BcolzFiveMinuteBarMetadata(object): """ FORMAT_VERSION = 3 - METADATA_FILENAME = 'metadata.json' + METADATA_FILENAME = 'five-minute-metadata.json' @classmethod def metadata_path(cls, rootdir): @@ -257,7 +271,7 @@ class BcolzFiveMinuteBarMetadata(object): calendar, start_session, end_session, - minutes_per_day, + five_minutes_per_day, version=version, ) @@ -290,7 +304,7 @@ class BcolzFiveMinuteBarMetadata(object): ohlc_ratio : int The default ratio by which to multiply the pricing data to convert the floats from floats to an integer to fit within - the np.uint32. If ohlc_ratios_per_sid is None or does not + the np.uint64. If ohlc_ratios_per_sid is None or does not contain a mapping for a given sid, this ratio is used. ohlc_ratios_per_sid : dict A dict mapping each sid in the output to the factor by @@ -334,7 +348,7 @@ class BcolzFiveMinuteBarMetadata(object): 'version': self.version, 'ohlc_ratio': self.default_ohlc_ratio, 'ohlc_ratios_per_sid': self.ohlc_ratios_per_sid, - 'minutes_per_day': self.five_minutes_per_day, + 'five_minutes_per_day': self.five_minutes_per_day, 'calendar_name': self.calendar.name, 'start_session': str(self.start_session.date()), 'end_session': str(self.end_session.date()), @@ -374,13 +388,13 @@ class BcolzFiveMinuteBarWriter(object): The last trading session in the data set. default_ohlc_ratio : int, optional The default ratio by which to multiply the pricing data to - convert from floats to integers that fit within np.uint32. If + convert from floats to integers that fit within np.uint64. If ohlc_ratios_per_sid is None or does not contain a mapping for a given sid, this ratio is used. Default is OHLC_RATIO (1000). ohlc_ratios_per_sid : dict, optional A dict mapping each sid in the output to the ratio by which to multiply the pricing data to convert the floats from floats to - an integer to fit within the np.uint32. + an integer to fit within the np.uint64. expectedlen : int, optional The expected length of the dataset, used when creating the initial bcolz ctable. @@ -405,9 +419,9 @@ class BcolzFiveMinuteBarWriter(object): The open, high, low, and close columns are integers which are 1000 times the quoted price, so that the data can represented and stored as an - np.uint32, supporting market prices quoted up to the thousands place. + np.uint64, supporting market prices quoted up to the thousands place. - volume is a np.uint32 with no mutation of the tens place. + volume is a np.uint64 with no mutation of the tens place. The 'index' for each individual asset are a repeating period of minutes of length `minutes_per_day` starting from each market open. @@ -450,7 +464,7 @@ class BcolzFiveMinuteBarWriter(object): five_minutes_per_day, default_ohlc_ratio=OHLC_RATIO, ohlc_ratios_per_sid=None, - expectedlen=DEFAULT_EXPECTED_CRYPTO_LEN, + expectedlen=DEFAULT_EXPECTEDLEN_CRYPTO, write_metadata=True): self._rootdir = rootdir @@ -466,11 +480,11 @@ class BcolzFiveMinuteBarWriter(object): self._default_ohlc_ratio = default_ohlc_ratio self._ohlc_ratios_per_sid = ohlc_ratios_per_sid - self._minute_index = _calc_minute_index( - self._schedule.market_open, self._minutes_per_day) + self._five_minute_index = _calc_five_minute_index( + self._schedule.market_open, self._five_minutes_per_day) if write_metadata: - metadata = BcolzMinuteBarMetadata( + metadata = BcolzFiveMinuteBarMetadata( self._default_ohlc_ratio, self._ohlc_ratios_per_sid, self._calendar, @@ -575,7 +589,7 @@ class BcolzFiveMinuteBarWriter(object): if not os.path.exists(sid_containing_dirname): # Other sids may have already created the containing directory. os.makedirs(sid_containing_dirname) - initial_array = np.empty(0, np.uint32) + initial_array = np.empty(0, np.uint64) table = ctable( rootdir=path, columns=[ @@ -612,7 +626,7 @@ class BcolzFiveMinuteBarWriter(object): five_minute_offset = len(table) % self._five_minutes_per_day num_to_prepend = numdays * self._five_minutes_per_day - five_minute_offset - prepend_array = np.zeros(num_to_prepend, np.uint32) + prepend_array = np.zeros(num_to_prepend, np.uint64) # Fill all OHLCV with zeros. table.append([prepend_array] * 5) table.flush() @@ -667,7 +681,11 @@ class BcolzFiveMinuteBarWriter(object): for k, v in kwargs.items(): table.attrs[k] = v - def write(self, data, show_progress=False, invalid_data_behavior='warn'): + def write(self, + data, + length=None, + show_progress=False, + invalid_data_behavior='warn'): """Write a stream of minute data. Parameters @@ -687,14 +705,15 @@ class BcolzFiveMinuteBarWriter(object): show_progress : bool, optional Whether or not to show a progress bar while writing. """ - ctx = maybe_show_progress( + with maybe_show_progress( data, + length=length, + show_percent=False, show_progress=show_progress, - item_show_func=lambda e: e if e is None else str(e[0]), - label="Merging minute equity files:", - ) - write_sid = self.write_sid - with ctx as it: + item_show_func=item_show_count(length), + label='Compiling five-minute data', + ) as it: + write_sid = self.write_sid for e in it: write_sid(*e, invalid_data_behavior=invalid_data_behavior) @@ -796,10 +815,13 @@ class BcolzFiveMinuteBarWriter(object): # Get the number of minutes already recorded in this sid's ctable num_rec_mins = table.size - all_minutes = self._minute_index + all_minutes = self._five_minute_index # Get the latest minute we wish to write to the ctable last_minute_to_write = pd.Timestamp(dts[-1], tz='UTC') + #print 'all_minutes[-1]:', all_minutes[num_rec_mins-1] + #print 'last_minute_to_write:', last_minute_to_write + # In the event that we've already written some minutely data to the # ctable, guard against overwriting that data. if num_rec_mins > 0: @@ -817,11 +839,11 @@ class BcolzFiveMinuteBarWriter(object): minutes_count = all_minutes_in_window.size - open_col = np.zeros(minutes_count, dtype=np.uint32) - high_col = np.zeros(minutes_count, dtype=np.uint32) - low_col = np.zeros(minutes_count, dtype=np.uint32) - close_col = np.zeros(minutes_count, dtype=np.uint32) - vol_col = np.zeros(minutes_count, dtype=np.uint32) + open_col = np.zeros(minutes_count, dtype=uint64) + high_col = np.zeros(minutes_count, dtype=uint64) + low_col = np.zeros(minutes_count, dtype=uint64) + close_col = np.zeros(minutes_count, dtype=uint64) + vol_col = np.zeros(minutes_count, dtype=uint64) dt_ixs = np.searchsorted(all_minutes_in_window.values, dts.astype('datetime64[ns]')) @@ -853,7 +875,7 @@ class BcolzFiveMinuteBarWriter(object): day_ix = self._session_labels.get_loc(day) # Add one to the 0-indexed day_ix to get the number of days. num_days = day_ix + 1 - return num_days * self._minutes_per_day + return num_days * self._five_minutes_per_day def truncate(self, date): """Truncate data beyond this date in all ctables.""" @@ -991,7 +1013,7 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): market_closes = self._market_closes.values.astype('datetime64[m]') minutes_per_day = (market_closes - market_opens).astype(np.int64) / 5 early_indices = np.where( - minutes_per_day != self._minutes_per_day - 1)[0] + minutes_per_day != self._five_minutes_per_day - 1)[0] early_opens = self._market_opens[early_indices] early_closes = self._market_closes[early_indices] minutes = [(market_open, early_close) @@ -1019,9 +1041,9 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): """ itree = IntervalTree() for market_open, early_close in self._minutes_to_exclude(): - start_pos = self._find_position_of_minute(early_close) + 1 + start_pos = self._find_position_of_five_minute(early_close) + 1 end_pos = ( - self._find_position_of_minute(market_open) + self._find_position_of_five_minute(market_open) + self._five_minutes_per_day - @@ -1110,7 +1132,7 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): minute_pos = self._last_get_value_dt_position else: try: - minute_pos = self._find_position_of_minute(dt) + minute_pos = self._find_position_of_five_minute(dt) except ValueError: raise NoDataOnDate() @@ -1132,15 +1154,15 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): return value def get_last_traded_dt(self, asset, dt): - minute_pos = self._find_last_traded_position(asset, dt) + minute_pos = self._find_last_traded_five_minute_position(asset, dt) if minute_pos == -1: return pd.NaT return self._pos_to_minute(minute_pos) - def _find_last_traded_position(self, asset, dt): + def _find_last_traded_five_minute_position(self, asset, dt): volumes = self._open_minute_file('volume', asset) - start_date_minute = asset.start_date.value / NANOS_IN_MINUTE - dt_minute = dt.value / NANOS_IN_MINUTE + start_date_minute = asset.start_date.value / NANOS_IN_FIVE_MINUTE + dt_minute = dt.value / NANOS_IN_FIVE_MINUTE try: # if we know of a dt before which this asset has no volume, @@ -1152,13 +1174,13 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): if dt_minute < earliest_dt_to_search: return -1 - pos = find_last_traded_position_internal( + pos = find_last_traded_five_minute_position_internal( self._market_open_values, self._market_close_values, dt_minute, earliest_dt_to_search, volumes, - self._minutes_per_day, + self._five_minutes_per_day, ) if pos == -1: @@ -1175,15 +1197,15 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): return pos def _pos_to_minute(self, pos): - minute_epoch = minute_value( + minute_epoch = five_minute_value( self._market_open_values, pos, - self._minutes_per_day + self._five_minutes_per_day ) return pd.Timestamp(minute_epoch, tz='UTC', unit="m") - def _find_position_of_minute(self, minute_dt): + def _find_position_of_five_minute(self, minute_dt): """ Internal method that returns the position of the given minute in the list of every trading minute since market open of the first trading @@ -1202,11 +1224,11 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): int: The position of the given minute in the list of all trading minutes since market open on the first trading day. """ - return find_position_of_minute( + return find_position_of_five_minute( self._market_open_values, self._market_close_values, - minute_dt.value / NANOS_IN_MINUTE, - self._minutes_per_day, + minute_dt.value / NANOS_IN_FIVE_MINUTE, + self._five_minutes_per_day, False, ) @@ -1230,8 +1252,8 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): (minutes in range, sids) with a dtype of float64, containing the values for the respective field over start and end dt range. """ - start_idx = self._find_position_of_minute(start_dt) - end_idx = self._find_position_of_minute(end_dt) + start_idx = self._find_position_of_five_minute(start_dt) + end_idx = self._find_position_of_five_minute(end_dt) num_minutes = (end_idx - start_idx + 1) @@ -1250,7 +1272,7 @@ class BcolzFiveMinuteBarReader(FiveMinuteBarReader): if field != 'volume': out = np.full(shape, np.nan) else: - out = np.zeros(shape, dtype=np.uint32) + out = np.zeros(shape, dtype=uint64) for i, sid in enumerate(sids): carray = self._open_minute_file(field, sid) diff --git a/catalyst/data/loader.py b/catalyst/data/loader.py index 0ba94e89..b42b424a 100644 --- a/catalyst/data/loader.py +++ b/catalyst/data/loader.py @@ -33,7 +33,7 @@ from ..utils.paths import ( ) from ..utils.deprecate import deprecated -from catalyst.curate.poloniex import PoloniexCurator +from catalyst.data.bundles.poloniex import PoloniexBundle from catalyst.utils.calendars import get_calendar @@ -93,8 +93,8 @@ def has_data_for_dates(series_or_df, first_date, last_date): dts = series_or_df.index if not isinstance(dts, pd.DatetimeIndex): raise TypeError("Expected a DatetimeIndex, but got %s." % type(dts)) - first, last = dts[[0, -1]] - return (first <= first_date) and (last >= last_date) + first, last = dts[[0, -1]].tz_localize(None) + return (first <= first_date.tz_localize(None)) and (last >= last_date.tz_localize(None)) def load_crypto_market_data(trading_day=None, trading_days=None, @@ -134,17 +134,19 @@ def load_crypto_market_data(trading_day=None, trading_day, environ, ) + # Override first_date for treasury data since we have it for many more years + # and is independent of crypto data + first_date_treasury = pd.Timestamp('1990-01-01', tz='UTC') tc = ensure_treasury_data( bm_symbol, - first_date, + first_date_treasury, last_date, now, environ, ) benchmark_returns = br[br.index.slice_indexer(first_date, last_date)] - treasury_curves = tc[tc.index.slice_indexer(first_date, last_date)] + treasury_curves = tc[tc.index.slice_indexer(first_date_treasury, last_date)] return benchmark_returns, treasury_curves - def load_market_data(trading_day=None, trading_days=None, bm_symbol='SPY', @@ -232,11 +234,15 @@ def load_market_data(trading_day=None, trading_days=None, bm_symbol='SPY', treasury_curves = tc[tc.index.slice_indexer(first_date, last_date)] return benchmark_returns, treasury_curves -def ensure_crypto_benchmark_data(symbol, first_date, last_date, now, - trading_day, environ=None): + +def ensure_crypto_benchmark_data(symbol, + first_date, + last_date, + now, + trading_day, + environ=None): + filename = get_benchmark_filename(symbol) - source_filename = '/var/tmp/catalyst/data/poloniex/crypto_prices-{0}.csv'.\ - format(symbol) logger.info( ('Loading benchmark data for {symbol!r} ' @@ -269,92 +275,23 @@ def ensure_crypto_benchmark_data(symbol, first_date, last_date, now, last_date=last_date ) - def dateparse(time_in_secs): - return datetime.datetime.fromtimestamp(float(time_in_secs), pytz.utc) - - def compute_daily_bars(five_min_bars, schedule): - # filter and copy the entry at the beginning of each session - daily_bars = five_min_bars[ - five_min_bars.index.isin(schedule) - ].copy() - - day_offset = pd.Timedelta(days=1) - - # iterate through session starts doing: - # 1. filter five_min_bars to get all entries in one day - # 2. compute daily bar entry - # 3. record in rid-th row of daily_bars - for rid, start_date in enumerate(daily_bars.index): - # compute beginning of next session - end_date = start_date + day_offset - - # filter for entries session entries - day_data = five_min_bars[ - (five_min_bars.index >= start_date) & - (five_min_bars.index < end_date) - ] - - # compute and record daily bar - daily_bars.iloc[rid] = ( - day_data.open.iloc[0], # first open price - day_data.high.max(), # max of high prices - day_data.low.min(), # min of low prices - day_data.close.iloc[-1], # last close prices - day_data.volume.sum(), # sum of all volumes - ) - - # scale to allow trading 10-ths of a coin - scale = 10.0 - daily_bars.loc[:, 'open'] /= scale - daily_bars.loc[:, 'high'] /= scale - daily_bars.loc[:, 'low'] /= scale - daily_bars.loc[:, 'close'] /= scale - daily_bars.loc[:, 'volume'] *= scale - - return daily_bars - - - five_min_bars = None + # Load benchmark symbol from Poloniex API try: - # load five minute bars from csv cache - five_min_bars = pd.read_csv( - source_filename, - names=['date', 'open', 'high', 'low', 'close', 'volume'], - index_col=[0], - parse_dates=True, - date_parser=dateparse, + bundle = PoloniexBundle() + bench_raw = bundle._fetch_symbol_frame( + None, + symbol, + get_calendar(bundle.calendar_name), + first_date - trading_day, + last_date, + 'daily', ) - five_min_bars.index = pd.to_datetime(five_min_bars.index, utc=True, unit='s') - except (OSError, IOError): - # Otherwise load from Poloniex API - try: - pc = PoloniexCurator() - pc.append_data_single_pair(symbol) - - five_min_bars = pc.to_dataframe( - time.mktime(first_date.timetuple()), - time.mktime(last_date.timetuple()), - currencyPair=symbol, - ) - except (OSError, IOError, HTTPError): - logger.exception('Failed to new crypto benchmark returns') - raise - - # compute daily bars for open calendar - open_calendar = get_calendar('OPEN') - daily_bars = compute_daily_bars( - five_min_bars, - open_calendar.all_sessions, - ) - - # filter daily bars to include first_date and last_date - daily_bars = daily_bars[ - (daily_bars.index >= (first_date - trading_day)) & - (daily_bars.index <= last_date) - ] + except (OSError, IOError, HTTPError): + logger.exception('Failed to fetch new crypto benchmark returns') + raise # select close column and compute percent change between days - daily_close = daily_bars[['close']] + daily_close = bench_raw[['close']] daily_close = daily_close.pct_change(1).iloc[1:] try: @@ -430,6 +367,7 @@ def ensure_benchmark_data(symbol, first_date, last_date, now, trading_day, logger.warn("Still don't have expected data after redownload!") return data + def ensure_benchmark_data(symbol, first_date, last_date, now, trading_day, environ=None): """ @@ -544,11 +482,6 @@ def ensure_treasury_data(symbol, first_date, last_date, now, environ=None): def _load_cached_data(filename, first_date, last_date, now, resource_name, environ=None): - if resource_name == 'benchmark': - from_csv = pd.Series.from_csv - else: - from_csv = pd.DataFrame.from_csv - # Path for the cache. path = get_data_filepath(filename, environ) @@ -556,8 +489,10 @@ def _load_cached_data(filename, first_date, last_date, now, resource_name, # yet, so don't try to read from 'path'. if os.path.exists(path): try: - data = from_csv(path) - data.index = pd.to_datetime(data.index).tz_localize('UTC') + data = pd.DataFrame.from_csv(path) + if data.empty: + raise ValueError("File is empty.") + data.index = pd.to_datetime(data.index, infer_datetime_format=True, errors='coerce' ).tz_localize('UTC') if has_data_for_dates(data, first_date, last_date): return data diff --git a/catalyst/data/us_equity_pricing.py b/catalyst/data/us_equity_pricing.py index f20ac003..03d01a4e 100644 --- a/catalyst/data/us_equity_pricing.py +++ b/catalyst/data/us_equity_pricing.py @@ -73,7 +73,10 @@ from catalyst.utils.sqlite_utils import ( coerce_string_to_conn, ) from catalyst.utils.memoize import lazyval -from catalyst.utils.cli import maybe_show_progress +from catalyst.utils.cli import ( + item_show_count, + maybe_show_progress, +) from ._equities import _compute_row_slices, _read_bcolz_data from ._adjustments import load_adjustments_from_sqlite @@ -117,7 +120,15 @@ UINT64_MAX = iinfo(uint64).max def check_uint32_safe(value, colname): if value >= UINT32_MAX: raise ValueError( - "Value %s from column '%s' is too large" % (value, colname) + "Value %s from column '%s' is too large " + "for uint32" % (value, colname) + ) + +def check_uint64_safe(value, colname): + if value >= UINT64_MAX: + raise ValueError( + "Value %s from column '%s' is too large " + "for uint64" % (value, colname) ) @@ -218,10 +229,7 @@ class BcolzDailyBarWriter(object): @property def progress_bar_message(self): - return "Merging daily equity files:" - - def progress_bar_item_show_func(self, value): - return value if value is None else str(value[0]) + return 'Compiling daily data' def write(self, data, @@ -249,15 +257,17 @@ class BcolzDailyBarWriter(object): table : bcolz.ctable The newly-written table. """ + total = None if assets is None else len(assets) ctx = maybe_show_progress( ( (sid, self.to_ctable(df, invalid_data_behavior)) for sid, df in data ), show_progress=show_progress, - item_show_func=self.progress_bar_item_show_func, label=self.progress_bar_message, - length=len(assets) if assets is not None else None, + item_show_func=item_show_count(total), + length=total, + show_percent=False, ) with ctx as it: return self._write_internal(it, assets) diff --git a/catalyst/examples/buy_and_hodl.py b/catalyst/examples/buy_and_hodl.py index c3b832f1..57b7f19e 100644 --- a/catalyst/examples/buy_and_hodl.py +++ b/catalyst/examples/buy_and_hodl.py @@ -25,7 +25,7 @@ from catalyst.api import ( def initialize(context): - context.ASSET_NAME = 'USDT_ETH' + context.ASSET_NAME = 'USDT_BTC' context.TARGET_HODL_RATIO = 0.8 context.RESERVE_RATIO = 1.0 - context.TARGET_HODL_RATIO @@ -37,7 +37,13 @@ def initialize(context): context.is_buying = True context.asset = symbol(context.ASSET_NAME) + context.i = 0 + def handle_data(context, data): + context.i += 1 + + print 'i:', context.i + starting_cash = context.portfolio.starting_cash target_hodl_value = context.TARGET_HODL_RATIO * starting_cash reserve_value = context.RESERVE_RATIO * starting_cash diff --git a/catalyst/examples/dual_vwap.py b/catalyst/examples/dual_vwap.py index 81155ba1..52c1789d 100644 --- a/catalyst/examples/dual_vwap.py +++ b/catalyst/examples/dual_vwap.py @@ -52,7 +52,7 @@ def initialize(context): schedule_function( rebalance, - date_rules.every_day(), + time_rules=times_rules.every_minute(), ) diff --git a/catalyst/finance/performance/tracker.py b/catalyst/finance/performance/tracker.py index 6b84431e..810f2c32 100644 --- a/catalyst/finance/performance/tracker.py +++ b/catalyst/finance/performance/tracker.py @@ -111,6 +111,21 @@ class PerformanceTracker(object): self.treasury_curves, self.trading_calendar ) + elif self.emission_rate == '5-minute': + self.all_benchmark_returns = pd.Series( + index=pd.date_range( + self.sim_params.first_open, + self.sim_params.last_close, + freq='5min' + ), + ) + self.cumulative_risk_metrics = \ + risk.RiskMetricsCumulative( + self.sim_params, + self.treasury_curves, + self.trading_calendar, + create_first_day_stats=True, + ) elif self.emission_rate == 'minute': self.all_benchmark_returns = pd.Series(index=pd.date_range( self.sim_params.first_open, self.sim_params.last_close, diff --git a/catalyst/finance/risk/risk.py b/catalyst/finance/risk/risk.py index 2d371b56..7faca596 100644 --- a/catalyst/finance/risk/risk.py +++ b/catalyst/finance/risk/risk.py @@ -158,7 +158,7 @@ def choose_treasury(select_treasury, treasury_curves, start_session, ) break - if search_day: + if search_day and trading_calendar.name != 'OPEN': # Supress warning for 'OPEN' calendar if (search_dist is None or search_dist > 1) and \ search_days[0] <= end_session <= search_days[-1]: message = "No rate within 1 trading day of end date = \ diff --git a/catalyst/gens/sim_engine.pyx b/catalyst/gens/sim_engine.pyx index aa3a9d51..a318f292 100644 --- a/catalyst/gens/sim_engine.pyx +++ b/catalyst/gens/sim_engine.pyx @@ -20,7 +20,9 @@ cimport cython from cpython cimport bool cdef np.int64_t _nanos_in_minute = 60000000000 +cdef np.int64_t _nanos_in_five_minutes = 5 * _nanos_in_minute NANOS_IN_MINUTE = _nanos_in_minute +NANOS_IN_FIVE_MINUTES = _nanos_in_five_minutes cpdef enum: BAR = 0 @@ -115,3 +117,24 @@ cdef class MinuteSimulationClock: yield minute, BAR if minute_emission: yield minute, MINUTE_END + +cdef class FiveMinuteSimulationClock(MinuteSimulationClock): + @cython.boundscheck(False) + @cython.wraparound(False) + cdef dict calc_minutes_by_session(self): + cdef dict five_minutes_by_session + cdef int session_idx + cdef np.int64_t session_nano + cdef np.ndarray[np.int64_t, ndim=1] five_minutes_nanos + + five_minutes_by_session = {} + for session_idx, session_nano in enumerate(self.sessions_nanos): + five_minutes_nanos = np.arange( + self.market_opens_nanos[session_idx], + self.market_closes_nanos[session_idx], + _nanos_in_five_minutes + ) + five_minutes_by_session[session_nano] = pd.to_datetime( + five_minutes_nanos, utc=True, box=True + ) + return five_minutes_by_session diff --git a/catalyst/gens/tradesimulation.py b/catalyst/gens/tradesimulation.py index f7daa233..1ef9dfbf 100644 --- a/catalyst/gens/tradesimulation.py +++ b/catalyst/gens/tradesimulation.py @@ -34,6 +34,7 @@ class AlgorithmSimulator(object): EMISSION_TO_PERF_KEY_MAP = { 'minute': 'minute_perf', + '5-minute': '5_minute_perf', 'daily': 'daily_perf' } @@ -201,7 +202,7 @@ class AlgorithmSimulator(object): stack.enter_context(self.processor) stack.enter_context(ZiplineAPI(self.algo)) - if algo.data_frequency == 'minute': + if algo.data_frequency in set(('minute', '5-minute')): def execute_order_cancellation_policy(): algo.blotter.execute_cancel_policy(SESSION_END) diff --git a/catalyst/pipeline/loaders/crypto_pricing_loader.py b/catalyst/pipeline/loaders/crypto_pricing_loader.py index f8676cce..22c0ccf2 100644 --- a/catalyst/pipeline/loaders/crypto_pricing_loader.py +++ b/catalyst/pipeline/loaders/crypto_pricing_loader.py @@ -33,13 +33,30 @@ class CryptoPricingLoader(PipelineLoader): Delegates loading of baselines and adjustments. """ - def __init__(self, raw_price_loader, dataset): - self.raw_price_loader = raw_price_loader - self._columns = dataset.columns + def __init__(self, bundle, data_frequency, dataset): cal = get_calendar('OPEN') - self._all_sessions = cal.all_sessions + if data_frequency == 'daily': + reader = bundle.daily_bar_reader + all_sessions = cal.all_sessions + + elif data_frequency == '5-minute': + reader = bundle.five_minute_bar_reader + all_sessions = cal.all_five_minutes + + elif data_frequency == 'minute': + reader = bundle.minute_bar_reader + all_sessions = cal.all_minutes + + else: + raise ValueError( + 'Invalid data frequency: {}'.format(data_frequency) + ) + + self.raw_price_loader = reader + self._columns = dataset.columns + self._all_sessions = all_sessions @classmethod def from_files(cls, pricing_path): @@ -89,6 +106,7 @@ class CryptoPricingLoader(PipelineLoader): def _shift_dates(dates, start_date, end_date, shift): + try: start = dates.get_loc(start_date) except KeyError: diff --git a/catalyst/pipeline/loaders/equity_pricing_loader.py b/catalyst/pipeline/loaders/equity_pricing_loader.py index 99c8ae00..4d9ec055 100644 --- a/catalyst/pipeline/loaders/equity_pricing_loader.py +++ b/catalyst/pipeline/loaders/equity_pricing_loader.py @@ -36,15 +36,34 @@ class USEquityPricingLoader(PipelineLoader): Delegates loading of baselines and adjustments. """ - def __init__(self, raw_price_loader, adjustments_loader, dataset): - self.raw_price_loader = raw_price_loader - self.adjustments_loader = adjustments_loader + def __init__(self, bundle, data_frequency, dataset): + + if data_frequency == 'daily': + reader = bundle.daily_bar_reader + elif data_frequency == '5-minute': + reader = bundle.five_minute_bar_reader + elif daily_bar_reader == 'minute': + reader = bundle.minute_bar_reader + else: + raise ValueError( + 'Invalid data frequency: {}'.format(data_frequency) + ) + + cal = reader.trading_calendar or get_calendar('NYSE') + + if data_frequency == 'daily': + all_sessions = cal.all_sessions + elif data_frequency == '5-minute': + reader = bundle.five_minute_bar_reader + all_sessions = cal.all_five_minutes + elif daily_bar_reader == 'minute': + reader = bundle.minute_bar_reader + all_sessions = cal.all_minutes + + self.raw_price_loader = reader + self.adjustments_loader = bundle.adjustments_loader self._columns = dataset.columns - - cal = self.raw_price_loader.trading_calendar or \ - get_calendar("NYSE") - - self._all_sessions = cal.all_sessions + self._all_sessions = all_sessions @classmethod def from_files(cls, pricing_path, adjustments_path): diff --git a/catalyst/sources/benchmark_source.py b/catalyst/sources/benchmark_source.py index 05d5c601..846b7eb5 100644 --- a/catalyst/sources/benchmark_source.py +++ b/catalyst/sources/benchmark_source.py @@ -65,6 +65,19 @@ class BenchmarkSource(object): ) self._precalculated_series = minute_series + elif self.emission_rate == '5-minute': + five_minutes = \ + trading_calendar.five_minutes_for_sessions_in_range( + sessions[0], + sessions[-1], + ) + + five_minute_series = daily_series.reindex( + index=five_minutes, + method='ffill', + ) + + self._precalculated_series = five_minute_series else: self._precalculated_series = daily_series else: @@ -155,6 +168,21 @@ class BenchmarkSource(object): ffill=True )[asset] + return benchmark_series.pct_change()[1:] + elif self.emission_rate == '5-minute': + five_minutes = trading_calendar.five_minutes_for_sessions_in_range( + self.sessions[0], self.sessions[-1] + ) + benchmark_series = data_portal.get_history_window( + [asset], + five_minutes[-1], + bar_count=len(five_minutes) + 1, + frequency='5m', + field='price', + data_frequency=self.emission_rate, + ffill=True, + )[asset] + return benchmark_series.pct_change()[1:] else: start_date = asset.start_date diff --git a/catalyst/utils/calendars/calendar_utils.py b/catalyst/utils/calendars/calendar_utils.py index 0e4824a7..6ec335a5 100644 --- a/catalyst/utils/calendars/calendar_utils.py +++ b/catalyst/utils/calendars/calendar_utils.py @@ -25,7 +25,7 @@ _default_calendar_factories = { 'us_futures': QuantopianUSFuturesCalendar, } _default_calendar_aliases = { - 'CATX': 'OPEN', + 'POLO': 'OPEN', 'NASDAQ': 'NYSE', 'BATS': 'NYSE', 'CBOT': 'CME', diff --git a/catalyst/utils/calendars/exchange_calendar_open.py b/catalyst/utils/calendars/exchange_calendar_open.py index cf62b8f0..1dbc8cd3 100644 --- a/catalyst/utils/calendars/exchange_calendar_open.py +++ b/catalyst/utils/calendars/exchange_calendar_open.py @@ -1,6 +1,7 @@ from datetime import time from pytz import timezone +from pandas import Timestamp from pandas.tseries.offsets import DateOffset from catalyst.utils.memoize import lazyval @@ -28,3 +29,6 @@ class OpenExchangeCalendar(TradingCalendar): @lazyval def day(self): return DateOffset(days=1) + + def __init__(self, *args, **kwargs): + super(OpenExchangeCalendar, self).__init__(start=Timestamp('2015-03-01', tz='UTC'), **kwargs) diff --git a/catalyst/utils/calendars/trading_calendar.py b/catalyst/utils/calendars/trading_calendar.py index 29a16f97..a1489067 100644 --- a/catalyst/utils/calendars/trading_calendar.py +++ b/catalyst/utils/calendars/trading_calendar.py @@ -117,6 +117,9 @@ class TradingCalendar(with_metaclass(ABCMeta)): self._trading_minutes_nanos = self.all_minutes.values.\ astype(np.int64) + + self._trading_five_minutes_nanos = self.all_five_minutes.values.\ + astype(np.int64) self.first_trading_session = _all_days[0] self.last_trading_session = _all_days[-1] @@ -179,6 +182,18 @@ class TradingCalendar(with_metaclass(ABCMeta)): """ return int(self._minutes_per_session[start_session:end_session].sum()) + @lazyval + def _five_minutes_per_session(self): + diff = self.schedule.market_close - self.schedule.market_open + diff = diff.astype('timedelta64[m]') + return (diff + 1) // 5 + + def five_minutes_count_for_sessions_in_range(self, + start_session, + end_session): + five_mins = self._five_minutes_per_session[start_session:end_session] + return int(five_mins.sum()) + @property def regular_holidays(self): """ @@ -371,6 +386,10 @@ class TradingCalendar(with_metaclass(ABCMeta)): idx = next_divider_idx(self._trading_minutes_nanos, dt.value) return self.all_minutes[idx] + def next_five_minute(self, dt): + idx = next_divider_idx(self._trading_five_minutes_nanos, dt.values) + return self.all_five_mintutes[idx] + def previous_minute(self, dt): """ Given a dt, return the previous exchange minute. @@ -465,6 +484,12 @@ class TradingCalendar(with_metaclass(ABCMeta)): end_minute=self.schedule.at[session_label, 'market_close'], ) + def five_minutes_for_session(self, session_label): + return self.five_minutes_in_range( + start_five_minute=self.schedule.at[session_label, 'market_open'], + end_five_minute=self.schedule.at[session_label, 'market_close'], + ) + def minutes_window(self, start_dt, count): start_dt_nanos = start_dt.value all_minutes_nanos = self._trading_minutes_nanos @@ -566,6 +591,20 @@ class TradingCalendar(with_metaclass(ABCMeta)): return abs(end_idx - start_idx) + def five_minutes_in_range(self, start_five_minute, end_five_minute): + start_idx = searchsorted(self._trading_five_minutes_nanos, + start_five_minute.value) + + end_idx = searchsorted(self._trading_five_minutes_nanos, + end_five_minute.value) + + if end_five_minute.value == self._trading_five_minutes_nanos[end_idx]: + # if the end minute is a market minute, increase by 1 + end_idx += 1 + + return self.all_five_minutes[start_idx:end_idx] + + def minutes_in_range(self, start_minute, end_minute): """ Given start and end minutes, return all the calendar minutes @@ -623,6 +662,15 @@ class TradingCalendar(with_metaclass(ABCMeta)): return self.minutes_in_range(first_minute, last_minute) + def five_minutes_for_sessions_in_range(self, + start_session_label, + end_session_label): + + first_minute, _ = self.open_and_close_for_session(start_session_label) + _, last_minute = self.open_and_close_for_session(end_session_label) + + return self.five_minutes_in_range(first_minute, last_minute) + def open_and_close_for_session(self, session_label): """ Returns a tuple of timestamps of the open and close of the session @@ -690,8 +738,7 @@ class TradingCalendar(with_metaclass(ABCMeta)): def execution_time_from_close(self, close_dates): return close_dates - @lazyval - def all_minutes(self): + def _all_minutes_with_interval(self, interval): """ Returns a DatetimeIndex representing all the minutes in this calendar. """ @@ -703,8 +750,10 @@ class TradingCalendar(with_metaclass(ABCMeta)): deltas = closes_in_ns - opens_in_ns + nanos_in_interval = interval * NANOS_IN_MINUTE + # + 1 because we want 390 days per standard day, not 389 - daily_sizes = (deltas / NANOS_IN_MINUTE) + 1 + daily_sizes = (deltas / nanos_in_interval) + 1 num_minutes = np.sum(daily_sizes).astype(np.int64) # One allocation for the entire thing. This assumes that each day @@ -721,13 +770,27 @@ class TradingCalendar(with_metaclass(ABCMeta)): np.arange( opens_in_ns[day_idx], closes_in_ns[day_idx] + NANOS_IN_MINUTE, - NANOS_IN_MINUTE + nanos_in_interval ) idx += size_int return DatetimeIndex(all_minutes).tz_localize("UTC") + @lazyval + def all_five_minutes(self): + """ + Returns a DatetimeIndex representing all the five minutes in this calendar. + """ + return self._all_minutes_with_interval(5) + + @lazyval + def all_minutes(self): + """ + Returns a DatetimeIndex representing all the minutes in this calendar. + """ + return self._all_minutes_with_interval(1) + @preprocess(dt=coerce(pd.Timestamp, attrgetter('value'))) def minute_to_session_label(self, dt, direction="next"): """ diff --git a/catalyst/utils/cli.py b/catalyst/utils/cli.py index 70e15d35..51e519e0 100644 --- a/catalyst/utils/cli.py +++ b/catalyst/utils/cli.py @@ -1,10 +1,34 @@ +from itertools import count + import click import pandas as pd from .context_tricks import CallbackManager +DEFAULT_BAR_TEMPLATE = ' [%(bar)s] %(label)s: %(info)s' +DEFAULT_EMPTY_CHAR = ' ' +DEFAULT_FILL_CHAR = '=' -def maybe_show_progress(it, show_progress, **kwargs): +def item_show_count(total=None): + def maybe_show_total(index): + if total is not None: + return '{0}/{1}'.format(index, total) + return str(index) + + def item_show_func(item, _it=iter(count())): + if item is not None: + starting = False + return maybe_show_total(next(_it)) + return 'DONE' + + return item_show_func + +def maybe_show_progress(it, + show_progress, + empty_char=DEFAULT_EMPTY_CHAR, + fill_char=DEFAULT_FILL_CHAR, + bar_template=DEFAULT_BAR_TEMPLATE, + **kwargs): """Optionally show a progress bar for the given iterator. Parameters @@ -30,6 +54,9 @@ def maybe_show_progress(it, show_progress, **kwargs): ... """ if show_progress: + kwargs['bar_template'] = bar_template + kwargs['empty_char'] = empty_char + kwargs['fill_char'] = fill_char return click.progressbar(it, **kwargs) # context manager that just return `it` when we enter it diff --git a/catalyst/utils/events.py b/catalyst/utils/events.py index 10bb2f55..3fc83dd4 100644 --- a/catalyst/utils/events.py +++ b/catalyst/utils/events.py @@ -602,6 +602,7 @@ class date_rules(object): class time_rules(object): market_open = AfterOpen market_close = BeforeClose + every_5_minutes = Always every_minute = Always diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index d4b75207..d63c314a 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -156,15 +156,15 @@ def _run(handle_data, environ=environ, ) - first_trading_day =\ - bundle_data.equity_minute_bar_reader.first_trading_day + first_trading_day = bundle_data.minute_bar_reader.first_trading_day data = DataPortal( env.asset_finder, open_calendar, first_trading_day=first_trading_day, - equity_minute_reader=bundle_data.equity_minute_bar_reader, - equity_daily_reader=bundle_data.equity_daily_bar_reader, + minute_reader=bundle_data.minute_bar_reader, + five_minute_reader=bundle_data.five_minute_bar_reader, + daily_reader=bundle_data.daily_bar_reader, adjustment_reader=bundle_data.adjustment_reader, ) @@ -179,13 +179,14 @@ def _run(handle_data, if b == 'poloniex': return CryptoPricingLoader( - bundle_data.equity_daily_bar_reader, + bundle_data, + data_frequency, CryptoPricing, ) - elif b == 'quantopian-quandl': + elif b == 'quandl': return USEquityPricingLoader( - bundle_data.equity_daily_bar_reader, - bundle_data.adjustment_reader, + bundle_data, + data_frequency, USEquityPricing, ) raise ValueError( @@ -216,6 +217,7 @@ def _run(handle_data, end=end, capital_base=capital_base, data_frequency=data_frequency, + emission_rate=data_frequency, ), **{ 'initialize': initialize,