mirror of
https://github.com/wassname/catalyst.git
synced 2026-07-04 13:03:26 +08:00
Merge pull request #841 from quantopian/remember-downloads
Remember downloads
This commit is contained in:
@@ -69,3 +69,6 @@ Miscellaneous
|
||||
without ``nose_parameterized.expand`` which bloats the test output
|
||||
(:issue:`833`).
|
||||
* Limits timer report in test output to 15 longest tests (:issue:`838`).
|
||||
* Treasury and benchmark downloads will now wait up to an hour to download
|
||||
again if data returned from a remote source does not extend to the date
|
||||
expected. (:issue:`841`).
|
||||
|
||||
+56
-6
@@ -12,6 +12,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from __future__ import print_function
|
||||
import os
|
||||
from collections import OrderedDict
|
||||
|
||||
@@ -47,6 +48,15 @@ INDEX_MAPPING = {
|
||||
(treasuries, 'treasury_curves.csv', 'www.federalreserve.gov'),
|
||||
}
|
||||
|
||||
ONE_HOUR = pd.Timedelta(hours=1)
|
||||
|
||||
|
||||
def last_modified_time(path):
|
||||
"""
|
||||
Get the last modified time of path as a Timestamp.
|
||||
"""
|
||||
return pd.Timestamp(os.path.getmtime(path), unit='s', tz='UTC')
|
||||
|
||||
|
||||
def get_data_filepath(name):
|
||||
"""
|
||||
@@ -128,6 +138,7 @@ def load_market_data(trading_day=trading_day_nyse,
|
||||
'1year','2year','3year','5year','7year','10year','20year','30year'
|
||||
"""
|
||||
first_date = trading_days[0]
|
||||
now = pd.Timestamp.utcnow()
|
||||
|
||||
# We expect to have benchmark and treasury data that's current up until
|
||||
# **two** full trading days prior to the most recently completed trading
|
||||
@@ -143,14 +154,13 @@ def load_market_data(trading_day=trading_day_nyse,
|
||||
|
||||
# We'll attempt to download new data if the latest entry in our cache is
|
||||
# before this date.
|
||||
last_date = trading_days[
|
||||
trading_days.get_loc(pd.Timestamp.utcnow(), method='ffill') - 2
|
||||
]
|
||||
last_date = trading_days[trading_days.get_loc(now, method='ffill') - 2]
|
||||
|
||||
benchmark_returns = ensure_benchmark_data(
|
||||
bm_symbol,
|
||||
first_date,
|
||||
last_date,
|
||||
now,
|
||||
# We need the trading_day to figure out the close prior to the first
|
||||
# date so that we can compute returns for the first date.
|
||||
trading_day,
|
||||
@@ -159,11 +169,12 @@ def load_market_data(trading_day=trading_day_nyse,
|
||||
bm_symbol,
|
||||
first_date,
|
||||
last_date,
|
||||
now,
|
||||
)
|
||||
return benchmark_returns, treasury_curves
|
||||
|
||||
|
||||
def ensure_benchmark_data(symbol, first_date, last_date, trading_day):
|
||||
def ensure_benchmark_data(symbol, first_date, last_date, now, trading_day):
|
||||
"""
|
||||
Ensure we have benchmark data for `symbol` from `first_date` to `last_date`
|
||||
|
||||
@@ -175,6 +186,10 @@ def ensure_benchmark_data(symbol, first_date, last_date, trading_day):
|
||||
First required date for the cache.
|
||||
last_date : pd.Timestamp
|
||||
Last required date for the cache.
|
||||
now : pd.Timestamp
|
||||
The current time. This is used to prevent repeated attempts to
|
||||
re-download data that isn't available due to scheduling quirks or other
|
||||
failures.
|
||||
trading_day : pd.CustomBusinessDay
|
||||
A trading day delta. Used to find the day before first_date so we can
|
||||
get the close of the day prior to first_date.
|
||||
@@ -182,12 +197,28 @@ def ensure_benchmark_data(symbol, first_date, last_date, trading_day):
|
||||
We attempt to download data unless we already have data stored at the data
|
||||
cache for `symbol` whose first entry is before or on `first_date` and whose
|
||||
last entry is on or after `last_date`.
|
||||
|
||||
If we perform a download and the cache criteria are not satisfied, we wait
|
||||
at least one hour before attempting a redownload. This is determined by
|
||||
comparing the current time to the result of os.path.getmtime on the cache
|
||||
path.
|
||||
"""
|
||||
path = get_data_filepath(get_benchmark_filename(symbol))
|
||||
try:
|
||||
data = pd.Series.from_csv(path).tz_localize('UTC')
|
||||
if has_data_for_dates(data, first_date, last_date):
|
||||
return data
|
||||
|
||||
# Don't re-download if we've successfully downloaded and written a file
|
||||
# in the last hour.
|
||||
last_download_time = last_modified_time(path)
|
||||
if (now - last_download_time) <= ONE_HOUR:
|
||||
logger.warn(
|
||||
"Refusing to download new benchmark "
|
||||
"data because a download succeeded at %s." % last_download_time
|
||||
)
|
||||
return data
|
||||
|
||||
except (OSError, IOError, ValueError) as e:
|
||||
# These can all be raised by various versions of pandas on various
|
||||
# classes of malformed input. Treat them all as cache misses.
|
||||
@@ -212,7 +243,7 @@ def ensure_benchmark_data(symbol, first_date, last_date, trading_day):
|
||||
return data
|
||||
|
||||
|
||||
def ensure_treasury_data(bm_symbol, first_date, last_date):
|
||||
def ensure_treasury_data(bm_symbol, first_date, last_date, now):
|
||||
"""
|
||||
Ensure we have treasury data from treasury module associated with
|
||||
`bm_symbol`.
|
||||
@@ -225,10 +256,19 @@ def ensure_treasury_data(bm_symbol, first_date, last_date):
|
||||
First date required to be in the cache.
|
||||
last_date : pd.Timestamp
|
||||
Last date required to be in the cache.
|
||||
now : pd.Timestamp
|
||||
The current time. This is used to prevent repeated attempts to
|
||||
re-download data that isn't available due to scheduling quirks or other
|
||||
failures.
|
||||
|
||||
We attempt to download data unless we already have data stored in the cache
|
||||
for `module_name` whose first entry is before or on `first_date` and whose
|
||||
last entry is on or after `last_date`.
|
||||
|
||||
If we perform a download and the cache criteria are not satisfied, we wait
|
||||
at least one hour before attempting a redownload. This is determined by
|
||||
comparing the current time to the result of os.path.getmtime on the cache
|
||||
path.
|
||||
"""
|
||||
loader_module, filename, source = INDEX_MAPPING.get(
|
||||
bm_symbol, INDEX_MAPPING['^GSPC']
|
||||
@@ -239,6 +279,17 @@ def ensure_treasury_data(bm_symbol, first_date, last_date):
|
||||
data = pd.DataFrame.from_csv(path).tz_localize('UTC')
|
||||
if has_data_for_dates(data, first_date, last_date):
|
||||
return data
|
||||
|
||||
# Don't re-download if we've successfully downloaded and written a file
|
||||
# in the last hour.
|
||||
last_download_time = last_modified_time(path)
|
||||
if (now - last_download_time) <= ONE_HOUR:
|
||||
logger.warn(
|
||||
"Refusing to download new treasury "
|
||||
"data because a download succeeded at %s." % last_download_time
|
||||
)
|
||||
return data
|
||||
|
||||
except (OSError, IOError, ValueError) as e:
|
||||
# These can all be raised by various versions of pandas on various
|
||||
# classes of malformed input. Treat them all as cache misses.
|
||||
@@ -273,7 +324,6 @@ def _load_raw_yahoo_data(indexes=None, stocks=None, start=None, end=None):
|
||||
This is based on code presented in a talk by Wes McKinney:
|
||||
http://wesmckinney.com/files/20111017/notebook_output.pdf
|
||||
"""
|
||||
|
||||
assert indexes is not None or stocks is not None, """
|
||||
must specify stocks or indexes"""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user