From 67bd5c8f6a74bf55fd273e52a90b8e7631bca710 Mon Sep 17 00:00:00 2001 From: Frederic Fortier Date: Fri, 22 Dec 2017 15:45:23 -0500 Subject: [PATCH] BLD: improvements following unit tests --- catalyst/exchange/asset_finder_exchange.py | 66 ++++++++ catalyst/exchange/exchange_algorithm.py | 3 +- catalyst/exchange/exchange_data_portal.py | 2 +- catalyst/exchange/exchange_pricing_loader.py | 158 +++++++++++++++++++ catalyst/exchange/stats_utils.py | 32 ++++ catalyst/pipeline/engine.py | 5 +- catalyst/utils/run_algo.py | 13 +- tests/exchange/test_suite_bundle.py | 12 +- 8 files changed, 279 insertions(+), 12 deletions(-) create mode 100644 catalyst/exchange/exchange_pricing_loader.py diff --git a/catalyst/exchange/asset_finder_exchange.py b/catalyst/exchange/asset_finder_exchange.py index 2cf3aa4e..148aab2f 100644 --- a/catalyst/exchange/asset_finder_exchange.py +++ b/catalyst/exchange/asset_finder_exchange.py @@ -1,6 +1,9 @@ from logbook import Logger from catalyst.constants import LOG_LEVEL +from catalyst.exchange.factory import find_exchanges + +import pandas as pd log = Logger('AssetFinderExchange', level=LOG_LEVEL) @@ -97,3 +100,66 @@ class AssetFinderExchange(object): asset = exchange.get_asset(symbol, data_frequency) self._asset_cache[key] = asset return asset + + def lifetimes(self, dates, include_start_date): + """ + Compute a DataFrame representing asset lifetimes for the specified date + range. + + Parameters + ---------- + dates : pd.DatetimeIndex + The dates for which to compute lifetimes. + include_start_date : bool + Whether or not to count the asset as alive on its start_date. + + This is useful in a backtesting context where `lifetimes` is being + used to signify "do I have data for this asset as of the morning of + this date?" For many financial metrics, (e.g. daily close), data + isn't available for an asset until the end of the asset's first + day. + + Returns + ------- + lifetimes : pd.DataFrame + A frame of dtype bool with `dates` as index and an Int64Index of + assets as columns. The value at `lifetimes.loc[date, asset]` will + be True iff `asset` existed on `date`. If `include_start_date` is + False, then lifetimes.loc[date, asset] will be false when date == + asset.start_date. + + See Also + -------- + numpy.putmask + catalyst.pipeline.engine.SimplePipelineEngine._compute_root_mask + """ + exchanges = find_exchanges(features=['minuteBundle']) + if not exchanges: + raise ValueError('exchange with minute bundles not found') + + # TODO: find a way to support multiple exchanges + exchange = exchanges[0] + # Using a single exchange for now because are not unique for the + # same asset in different exchanges. I'd like to avoid binding + # pipeline to a single exchange. + exchange.init() + + data = [] + for dt in dates: + exists = [] + + for asset in exchange.assets: + if include_start_date: + condition = (asset.start_date <= dt < asset.end_minute) + + else: + condition = (asset.start_date < dt < asset.end_minute) + + exists.append(condition) + + data.append(exists) + + sids = [asset.sid for asset in exchange.assets] + df = pd.DataFrame(data, index=dates, columns=sids) + + return df diff --git a/catalyst/exchange/exchange_algorithm.py b/catalyst/exchange/exchange_algorithm.py index cc82b8aa..0c602faf 100644 --- a/catalyst/exchange/exchange_algorithm.py +++ b/catalyst/exchange/exchange_algorithm.py @@ -677,7 +677,8 @@ class ExchangeTradingAlgorithmLive(ExchangeTradingAlgorithmBase): self.perf_tracker.update_performance() frame_stats = self.prepare_period_stats( - data.current_dt, data.current_dt + timedelta(minutes=1)) + data.current_dt, data.current_dt + timedelta(minutes=1) + ) # Saving the last hour in memory self.frame_stats.append(frame_stats) diff --git a/catalyst/exchange/exchange_data_portal.py b/catalyst/exchange/exchange_data_portal.py index 02d88ca0..0298a786 100644 --- a/catalyst/exchange/exchange_data_portal.py +++ b/catalyst/exchange/exchange_data_portal.py @@ -339,7 +339,7 @@ class DataPortalExchangeBacktest(DataPortalExchangeBase): field=field, data_frequency=adj_data_frequency, algo_end_dt=self._last_available_session, - trailing_bar_count=trailing_bar_count + trailing_bar_count=trailing_bar_count, ) df = resample_history_df(pd.DataFrame(series), freq, field) diff --git a/catalyst/exchange/exchange_pricing_loader.py b/catalyst/exchange/exchange_pricing_loader.py new file mode 100644 index 00000000..3bf106e9 --- /dev/null +++ b/catalyst/exchange/exchange_pricing_loader.py @@ -0,0 +1,158 @@ +# Copyright 2015 Quantopian, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from numpy import ( + iinfo, + uint32, +) + +from catalyst.data.us_equity_pricing import BcolzDailyBarReader +from catalyst.lib.adjusted_array import AdjustedArray +from catalyst.errors import NoFurtherDataError +from catalyst.pipeline.data import DataSet, Column +from catalyst.pipeline.loaders.base import PipelineLoader +from catalyst.utils.calendars import get_calendar +from catalyst.utils.numpy_utils import float64_dtype + +UINT32_MAX = iinfo(uint32).max + + +class TradingPairPricing(DataSet): + """ + Dataset representing daily trading prices and volumes. + """ + open = Column(float64_dtype) + high = Column(float64_dtype) + low = Column(float64_dtype) + close = Column(float64_dtype) + volume = Column(float64_dtype) + + +class ExchangePricingLoader(PipelineLoader): + """ + PipelineLoader for Crypto Pricing data + + Delegates loading of baselines and adjustments. + """ + + def __init__(self, data_frequency): + + cal = get_calendar('OPEN') + + if data_frequency == 'daily': + reader = None + all_sessions = cal.all_sessions + + elif data_frequency == 'minute': + reader = None + all_sessions = cal.all_minutes + + else: + raise ValueError( + 'Invalid data frequency: {}'.format(data_frequency) + ) + + self.raw_price_loader = reader + self._columns = TradingPairPricing.columns + self._all_sessions = all_sessions + + @classmethod + def from_files(cls, pricing_path): + """ + Create a loader from a bcolz equity pricing dir and a SQLite + adjustments path. + + Parameters + ---------- + pricing_path : str + Path to a bcolz directory written by a BcolzDailyBarWriter. + """ + return cls( + BcolzDailyBarReader(pricing_path), + ) + + def load_adjusted_array(self, columns, dates, assets, mask): + # load_adjusted_array is called with dates on which the user's algo + # will be shown data, which means we need to return the data that would + # be known at the start of each date. We assume that the latest data + # known on day N is the data from day (N - 1), so we shift all query + # dates back by a day. + start_date, end_date = _shift_dates( + self._all_sessions, dates[0], dates[-1], shift=1, + ) + colnames = [c.name for c in columns] + raw_arrays = self.raw_price_loader.load_raw_arrays( + colnames, + start_date, + end_date, + assets, + ) + + out = {} + for c, c_raw in zip(columns, raw_arrays): + out[c] = AdjustedArray( + c_raw.astype(c.dtype), + mask, + {}, + c.missing_value, + ) + return out + + @property + def columns(self): + return self._columns + + +def _shift_dates(dates, start_date, end_date, shift): + try: + start = dates.get_loc(start_date) + except KeyError: + if start_date < dates[0]: + raise NoFurtherDataError( + msg=( + "Pipeline Query requested data starting on {query_start}, " + "but first known date is {calendar_start}" + ).format( + query_start=str(start_date), + calendar_start=str(dates[0]), + ) + ) + else: + raise ValueError("Query start %s not in calendar" % start_date) + + # Make sure that shifting doesn't push us out of the calendar. + if start < shift: + raise NoFurtherDataError( + msg=( + "Pipeline Query requested data from {shift}" + " days before {query_start}, but first known date is only " + "{start} days earlier." + ).format(shift=shift, query_start=start_date, start=start), + ) + + try: + end = dates.get_loc(end_date) + except KeyError: + if end_date > dates[-1]: + raise NoFurtherDataError( + msg=( + "Pipeline Query requesting data up to {query_end}, " + "but last known date is {calendar_end}" + ).format( + query_end=end_date, + calendar_end=dates[-1], + ) + ) + else: + raise ValueError("Query end %s not in calendar" % end_date) + return dates[start - shift], dates[end - shift] diff --git a/catalyst/exchange/stats_utils.py b/catalyst/exchange/stats_utils.py index 806cc4f8..ec9ab5e8 100644 --- a/catalyst/exchange/stats_utils.py +++ b/catalyst/exchange/stats_utils.py @@ -1,5 +1,6 @@ import copy import csv +import json import numbers import os import time @@ -9,8 +10,10 @@ import pandas as pd from catalyst.assets._assets import TradingPair from catalyst.exchange.exchange_utils import get_algo_folder +from catalyst.utils.paths import data_root s3_conn = [] +mailgun = [] def trend_direction(series): @@ -351,6 +354,35 @@ def stats_to_s3(uri, stats, algo_namespace, recorded_cols=None, obj.put(Body=bytes_to_write) +def email_error(algo_name, dt, e, environ=None): + import requests + import traceback + + if not mailgun: + root = data_root(environ) + filename = os.path.join(root, 'mailgun.json') + if not os.path.exists(filename): + raise ValueError( + 'mailgun.json not found in the catalyst data folder' + ) + + with open(filename) as data_file: + mailgun.append(json.load(data_file)) + + mg = mailgun[0] + + return requests.post( + mg['url'], + auth=("api", mg['api']), + data={ + "from": mg['from'], + "to": mg['to'], + "subject": 'Error: {}'.format(algo_name), + "text": '{}\n\n{}\n{}'.format( + dt, e, traceback.format_exc() + )}) + + def stats_to_algo_folder(stats, algo_namespace, recorded_cols=None): """ Saves the performance stats to the algo local folder. diff --git a/catalyst/pipeline/engine.py b/catalyst/pipeline/engine.py index 8b8eb4b6..4a6538fc 100644 --- a/catalyst/pipeline/engine.py +++ b/catalyst/pipeline/engine.py @@ -33,7 +33,6 @@ from catalyst.utils.sharedoc import copydoc class PipelineEngine(with_metaclass(ABCMeta)): - @abstractmethod def run_pipeline(self, pipeline, start_date, end_date): """ @@ -118,6 +117,7 @@ class ExplodingPipelineEngine(PipelineEngine): """ A PipelineEngine that doesn't do anything. """ + def run_pipeline(self, pipeline, start_date, end_date): raise NoEngineRegistered( "Attempted to run a pipeline but no pipeline " @@ -484,8 +484,9 @@ class SimplePipelineEngine(PipelineEngine): ) if isinstance(term, LoadableTerm): + term_key = loader_group_key(term) to_load = sorted( - loader_groups[loader_group_key(term)], + loader_groups[term_key], key=lambda t: t.dataset ) loader = get_loader(term) diff --git a/catalyst/utils/run_algo.py b/catalyst/utils/run_algo.py index 153e3af6..06c2e69b 100644 --- a/catalyst/utils/run_algo.py +++ b/catalyst/utils/run_algo.py @@ -12,7 +12,11 @@ from logbook import Logger from catalyst.data.bundles import load from catalyst.data.data_portal import DataPortal +from catalyst.exchange.exchange_pricing_loader import ExchangePricingLoader, \ + TradingPairPricing from catalyst.exchange.factory import get_exchange +from catalyst.pipeline import USEquityPricingLoader +from catalyst.pipeline.data import USEquityPricing try: from pygments import highlight @@ -173,7 +177,14 @@ def _run(handle_data, asset_db_path=None # We don't need an asset db, we have exchanges ) env.asset_finder = AssetFinderExchange() - choose_loader = None # TODO: use the DataPortal in the algo class for this + + def choose_loader(column): + bound_cols = TradingPairPricing.columns + if column in bound_cols: + return ExchangePricingLoader(data_frequency) + raise ValueError( + "No PipelineLoader registered for column %s." % column + ) if live: start = pd.Timestamp.utcnow() diff --git a/tests/exchange/test_suite_bundle.py b/tests/exchange/test_suite_bundle.py index 482bdb9c..5651d7f6 100644 --- a/tests/exchange/test_suite_bundle.py +++ b/tests/exchange/test_suite_bundle.py @@ -1,17 +1,15 @@ import random -from datetime import timedelta - -from logbook import Logger -from pandas.util.testing import assert_frame_equal import pandas as pd +from logbook import Logger +from pandas.util.testing import assert_frame_equal from catalyst import get_calendar from catalyst.exchange.asset_finder_exchange import AssetFinderExchange from catalyst.exchange.exchange_data_portal import DataPortalExchangeBacktest from catalyst.exchange.exchange_utils import get_candles_df from catalyst.exchange.factory import get_exchange -from catalyst.exchange.test_utils import select_random_exchanges, output_df, \ +from catalyst.exchange.test_utils import output_df, \ select_random_assets log = Logger('TestSuiteExchange') @@ -94,9 +92,8 @@ class TestSuiteBundle: assert_frame_equal( right=data['bundle'], left=data['exchange'], - check_less_precise=True + check_less_precise=True, ) - pass def test_validate_bundles(self): exchange_population = 3 @@ -144,3 +141,4 @@ class TestSuiteBundle: data_frequency=data_frequency, data_portal=data_portal, ) + pass