mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-27 22:36:38 +08:00
662 lines
24 KiB
Python
662 lines
24 KiB
Python
"""
|
|
Tests for setting up an EventsLoader and a BlazeEventsLoader.
|
|
"""
|
|
from datetime import time
|
|
import itertools
|
|
from itertools import product
|
|
|
|
import blaze as bz
|
|
from nose_parameterized import parameterized
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
from catalyst.pipeline import Pipeline, SimplePipelineEngine
|
|
from catalyst.pipeline.common import (
|
|
EVENT_DATE_FIELD_NAME,
|
|
TS_FIELD_NAME,
|
|
SID_FIELD_NAME,
|
|
)
|
|
from catalyst.pipeline.data import DataSet, Column
|
|
from catalyst.pipeline.loaders.events import EventsLoader
|
|
from catalyst.pipeline.loaders.blaze.events import BlazeEventsLoader
|
|
from catalyst.pipeline.loaders.utils import (
|
|
next_event_indexer,
|
|
normalize_timestamp_to_query_time,
|
|
previous_event_indexer,
|
|
)
|
|
from catalyst.testing import check_arrays, ZiplineTestCase
|
|
from catalyst.testing.fixtures import (
|
|
WithAssetFinder,
|
|
WithTradingSessions,
|
|
)
|
|
from catalyst.testing.predicates import assert_equal
|
|
from catalyst.utils.numpy_utils import (
|
|
categorical_dtype,
|
|
datetime64ns_dtype,
|
|
float64_dtype,
|
|
int64_dtype,
|
|
)
|
|
|
|
|
|
class EventDataSet(DataSet):
|
|
|
|
previous_event_date = Column(dtype=datetime64ns_dtype)
|
|
next_event_date = Column(dtype=datetime64ns_dtype)
|
|
|
|
previous_float = Column(dtype=float64_dtype)
|
|
next_float = Column(dtype=float64_dtype)
|
|
|
|
previous_datetime = Column(dtype=datetime64ns_dtype)
|
|
next_datetime = Column(dtype=datetime64ns_dtype)
|
|
|
|
previous_int = Column(dtype=int64_dtype, missing_value=-1)
|
|
next_int = Column(dtype=int64_dtype, missing_value=-1)
|
|
|
|
previous_string = Column(dtype=categorical_dtype, missing_value=None)
|
|
next_string = Column(dtype=categorical_dtype, missing_value=None)
|
|
|
|
previous_string_custom_missing = Column(
|
|
dtype=categorical_dtype,
|
|
missing_value=u"<<NULL>>",
|
|
)
|
|
next_string_custom_missing = Column(
|
|
dtype=categorical_dtype,
|
|
missing_value=u"<<NULL>>",
|
|
)
|
|
|
|
|
|
critical_dates = pd.to_datetime([
|
|
'2014-01-05',
|
|
'2014-01-10',
|
|
'2014-01-15',
|
|
'2014-01-20',
|
|
])
|
|
|
|
|
|
def make_events_for_sid(sid, event_dates, event_timestamps):
|
|
num_events = len(event_dates)
|
|
return pd.DataFrame({
|
|
'sid': np.full(num_events, sid, dtype=np.int64),
|
|
'timestamp': event_timestamps,
|
|
'event_date': event_dates,
|
|
'float': np.arange(num_events, dtype=np.float64) + sid,
|
|
'int': np.arange(num_events) + sid,
|
|
'datetime': pd.date_range('1990-01-01', periods=num_events).shift(sid),
|
|
'string': ['-'.join([str(sid), str(i)]) for i in range(num_events)],
|
|
})
|
|
|
|
|
|
def make_null_event_date_events(all_sids, timestamp):
|
|
"""
|
|
Make an event with a null event_date for all sids.
|
|
|
|
Used to test that EventsLoaders filter out null events.
|
|
"""
|
|
return pd.DataFrame({
|
|
'sid': all_sids,
|
|
'timestamp': timestamp,
|
|
'event_date': pd.Timestamp('NaT'),
|
|
'float': -9999.0,
|
|
'int': -9999,
|
|
'datetime': pd.Timestamp('1980'),
|
|
'string': 'should be ignored',
|
|
})
|
|
|
|
|
|
def make_events(add_nulls):
|
|
"""
|
|
Every event has at least three pieces of data associated with it:
|
|
|
|
1. sid : The ID of the asset associated with the event.
|
|
2. event_date : The date on which an event occurred.
|
|
3. timestamp : The date on which we learned about the event.
|
|
This can be before the occurence_date in the case of an
|
|
announcement about an upcoming event.
|
|
|
|
Events for two different sids shouldn't interact in any way, so the
|
|
interesting cases are determined by the possible interleavings of
|
|
event_date and timestamp for a single sid.
|
|
|
|
Fix two events with dates e1, e2 and timestamps t1 and t2.
|
|
|
|
Without loss of generality, assume that e1 < e2. (If two events have the
|
|
same occurrence date, the behavior of next/previous event is undefined).
|
|
|
|
The remaining possible sequences of events are given by taking all possible
|
|
4-tuples of four ascending dates. For each possible interleaving, we
|
|
generate a set of fake events with those dates and assign them to a new
|
|
sid.
|
|
"""
|
|
def gen_date_interleavings():
|
|
for e1, e2, t1, t2 in product(*[critical_dates] * 4):
|
|
if e1 < e2:
|
|
yield (e1, e2, t1, t2)
|
|
|
|
event_frames = []
|
|
for sid, (e1, e2, t1, t2) in enumerate(gen_date_interleavings()):
|
|
event_frames.append(make_events_for_sid(sid, [e1, e2], [t1, t2]))
|
|
|
|
if add_nulls:
|
|
for date in critical_dates:
|
|
event_frames.append(
|
|
make_null_event_date_events(
|
|
np.arange(sid + 1),
|
|
timestamp=date,
|
|
)
|
|
)
|
|
|
|
return pd.concat(event_frames, ignore_index=True)
|
|
|
|
|
|
class EventIndexerTestCase(ZiplineTestCase):
|
|
|
|
@classmethod
|
|
def init_class_fixtures(cls):
|
|
super(EventIndexerTestCase, cls).init_class_fixtures()
|
|
cls.events = make_events(add_nulls=False).sort_values('event_date')
|
|
cls.events.reset_index(inplace=True)
|
|
|
|
def test_previous_event_indexer(self):
|
|
events = self.events
|
|
event_sids = events['sid'].values
|
|
event_dates = events['event_date'].values
|
|
event_timestamps = events['timestamp'].values
|
|
|
|
all_dates = pd.date_range('2014', '2014-01-31')
|
|
all_sids = np.unique(event_sids)
|
|
|
|
indexer = previous_event_indexer(
|
|
all_dates,
|
|
all_sids,
|
|
event_dates,
|
|
event_timestamps,
|
|
event_sids,
|
|
)
|
|
|
|
# Compute expected results without knowledge of null events.
|
|
for i, sid in enumerate(all_sids):
|
|
self.check_previous_event_indexer(
|
|
events,
|
|
all_dates,
|
|
sid,
|
|
indexer[:, i],
|
|
)
|
|
|
|
def check_previous_event_indexer(self,
|
|
events,
|
|
all_dates,
|
|
sid,
|
|
indexer):
|
|
relevant_events = events[events.sid == sid]
|
|
self.assertEqual(len(relevant_events), 2)
|
|
|
|
ix1, ix2 = relevant_events.index
|
|
|
|
# An event becomes a possible value once we're past both its event_date
|
|
# and its timestamp.
|
|
event1_first_eligible = max(
|
|
relevant_events.loc[ix1, ['event_date', 'timestamp']],
|
|
)
|
|
event2_first_eligible = max(
|
|
relevant_events.loc[ix2, ['event_date', 'timestamp']],
|
|
)
|
|
|
|
for date, computed_index in zip(all_dates, indexer):
|
|
if date >= event2_first_eligible:
|
|
# If we've seen event 2, it should win even if we've seen event
|
|
# 1, because events are sorted by event_date.
|
|
self.assertEqual(computed_index, ix2)
|
|
elif date >= event1_first_eligible:
|
|
# If we've seen event 1 but not event 2, event 1 should win.
|
|
self.assertEqual(computed_index, ix1)
|
|
else:
|
|
# If we haven't seen either event, then we should have -1 as
|
|
# sentinel.
|
|
self.assertEqual(computed_index, -1)
|
|
|
|
def test_next_event_indexer(self):
|
|
events = self.events
|
|
event_sids = events['sid'].values
|
|
event_dates = events['event_date'].values
|
|
event_timestamps = events['timestamp'].values
|
|
|
|
all_dates = pd.date_range('2014', '2014-01-31')
|
|
all_sids = np.unique(event_sids)
|
|
|
|
indexer = next_event_indexer(
|
|
all_dates,
|
|
all_sids,
|
|
event_dates,
|
|
event_timestamps,
|
|
event_sids,
|
|
)
|
|
|
|
# Compute expected results without knowledge of null events.
|
|
for i, sid in enumerate(all_sids):
|
|
self.check_next_event_indexer(
|
|
events,
|
|
all_dates,
|
|
sid,
|
|
indexer[:, i],
|
|
)
|
|
|
|
def check_next_event_indexer(self,
|
|
events,
|
|
all_dates,
|
|
sid,
|
|
indexer):
|
|
relevant_events = events[events.sid == sid]
|
|
self.assertEqual(len(relevant_events), 2)
|
|
|
|
ix1, ix2 = relevant_events.index
|
|
e1, e2 = relevant_events['event_date']
|
|
t1, t2 = relevant_events['timestamp']
|
|
|
|
for date, computed_index in zip(all_dates, indexer):
|
|
# An event is eligible to be the next event if it's between the
|
|
# timestamp and the event_date, inclusive.
|
|
if t1 <= date <= e1:
|
|
# If e1 is eligible, it should be chosen even if e2 is
|
|
# eligible, since it's earlier.
|
|
self.assertEqual(computed_index, ix1)
|
|
elif t2 <= date <= e2:
|
|
# e2 is eligible and e1 is not, so e2 should be chosen.
|
|
self.assertEqual(computed_index, ix2)
|
|
else:
|
|
# Neither event is eligible. Return -1 as a sentinel.
|
|
self.assertEqual(computed_index, -1)
|
|
|
|
|
|
class EventsLoaderEmptyTestCase(WithAssetFinder,
|
|
WithTradingSessions,
|
|
ZiplineTestCase):
|
|
START_DATE = pd.Timestamp('2014-01-01')
|
|
END_DATE = pd.Timestamp('2014-01-30')
|
|
|
|
@classmethod
|
|
def init_class_fixtures(cls):
|
|
cls.ASSET_FINDER_EQUITY_SIDS = [0, 1]
|
|
cls.ASSET_FINDER_EQUITY_SYMBOLS = ['A', 'B']
|
|
super(EventsLoaderEmptyTestCase, cls).init_class_fixtures()
|
|
|
|
def frame_containing_all_missing_values(self, index, columns):
|
|
frame = pd.DataFrame(
|
|
index=index,
|
|
data={c.name: c.missing_value for c in EventDataSet.columns},
|
|
)
|
|
for c in columns:
|
|
# The construction above produces columns of dtype `object` when
|
|
# the missing value is string, but we expect categoricals in the
|
|
# final result.
|
|
if c.dtype == categorical_dtype:
|
|
frame[c.name] = frame[c.name].astype('category')
|
|
return frame
|
|
|
|
def test_load_empty(self):
|
|
"""
|
|
For the case where raw data is empty, make sure we have a result for
|
|
all sids, that the dimensions are correct, and that we have the
|
|
correct missing value.
|
|
"""
|
|
raw_events = pd.DataFrame(
|
|
columns=["sid",
|
|
"timestamp",
|
|
"event_date",
|
|
"float",
|
|
"int",
|
|
"datetime",
|
|
"string"]
|
|
)
|
|
next_value_columns = {
|
|
EventDataSet.next_datetime: 'datetime',
|
|
EventDataSet.next_event_date: 'event_date',
|
|
EventDataSet.next_float: 'float',
|
|
EventDataSet.next_int: 'int',
|
|
EventDataSet.next_string: 'string',
|
|
EventDataSet.next_string_custom_missing: 'string'
|
|
}
|
|
previous_value_columns = {
|
|
EventDataSet.previous_datetime: 'datetime',
|
|
EventDataSet.previous_event_date: 'event_date',
|
|
EventDataSet.previous_float: 'float',
|
|
EventDataSet.previous_int: 'int',
|
|
EventDataSet.previous_string: 'string',
|
|
EventDataSet.previous_string_custom_missing: 'string'
|
|
}
|
|
loader = EventsLoader(
|
|
raw_events, next_value_columns, previous_value_columns
|
|
)
|
|
engine = SimplePipelineEngine(
|
|
lambda x: loader,
|
|
self.trading_days,
|
|
self.asset_finder,
|
|
)
|
|
|
|
results = engine.run_pipeline(
|
|
Pipeline({c.name: c.latest for c in EventDataSet.columns}),
|
|
start_date=self.trading_days[0],
|
|
end_date=self.trading_days[-1],
|
|
)
|
|
|
|
assets = self.asset_finder.retrieve_all(self.ASSET_FINDER_EQUITY_SIDS)
|
|
dates = self.trading_days
|
|
|
|
expected = self.frame_containing_all_missing_values(
|
|
index=pd.MultiIndex.from_product([dates, assets]),
|
|
columns=EventDataSet.columns,
|
|
)
|
|
|
|
assert_equal(results, expected)
|
|
|
|
|
|
class EventsLoaderTestCase(WithAssetFinder,
|
|
WithTradingSessions,
|
|
ZiplineTestCase):
|
|
|
|
START_DATE = pd.Timestamp('2014-01-01')
|
|
END_DATE = pd.Timestamp('2014-01-30')
|
|
|
|
@classmethod
|
|
def init_class_fixtures(cls):
|
|
# This is a rare case where we actually want to do work **before** we
|
|
# call init_class_fixtures. We choose our sids for WithAssetFinder
|
|
# based on the events generated by make_event_data.
|
|
cls.raw_events = make_events(add_nulls=True)
|
|
cls.raw_events_no_nulls = cls.raw_events[
|
|
cls.raw_events['event_date'].notnull()
|
|
]
|
|
cls.next_value_columns = {
|
|
EventDataSet.next_datetime: 'datetime',
|
|
EventDataSet.next_event_date: 'event_date',
|
|
EventDataSet.next_float: 'float',
|
|
EventDataSet.next_int: 'int',
|
|
EventDataSet.next_string: 'string',
|
|
EventDataSet.next_string_custom_missing: 'string'
|
|
}
|
|
cls.previous_value_columns = {
|
|
EventDataSet.previous_datetime: 'datetime',
|
|
EventDataSet.previous_event_date: 'event_date',
|
|
EventDataSet.previous_float: 'float',
|
|
EventDataSet.previous_int: 'int',
|
|
EventDataSet.previous_string: 'string',
|
|
EventDataSet.previous_string_custom_missing: 'string'
|
|
}
|
|
cls.loader = cls.make_loader(
|
|
events=cls.raw_events,
|
|
next_value_columns=cls.next_value_columns,
|
|
previous_value_columns=cls.previous_value_columns,
|
|
)
|
|
cls.ASSET_FINDER_EQUITY_SIDS = list(cls.raw_events['sid'].unique())
|
|
cls.ASSET_FINDER_EQUITY_SYMBOLS = [
|
|
's' + str(n) for n in cls.ASSET_FINDER_EQUITY_SIDS
|
|
]
|
|
super(EventsLoaderTestCase, cls).init_class_fixtures()
|
|
|
|
@classmethod
|
|
def make_loader(cls, events, next_value_columns, previous_value_columns):
|
|
# This method exists to be overridden by BlazeEventsLoaderTestCase
|
|
return EventsLoader(events, next_value_columns, previous_value_columns)
|
|
|
|
def test_load_with_trading_calendar(self):
|
|
engine = SimplePipelineEngine(
|
|
lambda x: self.loader,
|
|
self.trading_days,
|
|
self.asset_finder,
|
|
)
|
|
|
|
results = engine.run_pipeline(
|
|
Pipeline({c.name: c.latest for c in EventDataSet.columns}),
|
|
start_date=self.trading_days[0],
|
|
end_date=self.trading_days[-1],
|
|
)
|
|
|
|
for c in EventDataSet.columns:
|
|
if c in self.next_value_columns:
|
|
self.check_next_value_results(
|
|
c,
|
|
results[c.name].unstack(),
|
|
self.trading_days,
|
|
)
|
|
elif c in self.previous_value_columns:
|
|
self.check_previous_value_results(
|
|
c,
|
|
results[c.name].unstack(),
|
|
self.trading_days,
|
|
)
|
|
else:
|
|
raise AssertionError("Unexpected column %s." % c)
|
|
|
|
def test_load_properly_forward_fills(self):
|
|
engine = SimplePipelineEngine(
|
|
lambda x: self.loader,
|
|
self.trading_days,
|
|
self.asset_finder,
|
|
)
|
|
|
|
# Cut the dates in half so we need to forward fill some data which
|
|
# is not in our window. The results should be computed the same as if
|
|
# we had computed across the entire window and then sliced after the
|
|
# computation.
|
|
dates = self.trading_days[len(self.trading_days) / 2:]
|
|
results = engine.run_pipeline(
|
|
Pipeline({c.name: c.latest for c in EventDataSet.columns}),
|
|
start_date=dates[0],
|
|
end_date=dates[-1],
|
|
)
|
|
|
|
for c in EventDataSet.columns:
|
|
if c in self.next_value_columns:
|
|
self.check_next_value_results(
|
|
c,
|
|
results[c.name].unstack(),
|
|
dates,
|
|
)
|
|
elif c in self.previous_value_columns:
|
|
self.check_previous_value_results(
|
|
c,
|
|
results[c.name].unstack(),
|
|
dates,
|
|
)
|
|
else:
|
|
raise AssertionError("Unexpected column %s." % c)
|
|
|
|
def assert_result_contains_all_sids(self, results):
|
|
assert_equal(
|
|
list(map(int, results.columns)),
|
|
self.ASSET_FINDER_EQUITY_SIDS,
|
|
)
|
|
|
|
def check_previous_value_results(self, column, results, dates):
|
|
"""
|
|
Check previous value results for a single column.
|
|
"""
|
|
# Verify that we got a result for every sid.
|
|
self.assert_result_contains_all_sids(results)
|
|
|
|
events = self.raw_events_no_nulls
|
|
# Remove timezone info from trading days, since the outputs
|
|
# from pandas won't be tz_localized.
|
|
dates = dates.tz_localize(None)
|
|
|
|
for asset, asset_result in results.iteritems():
|
|
relevant_events = events[events.sid == asset.sid]
|
|
self.assertEqual(len(relevant_events), 2)
|
|
|
|
v1, v2 = relevant_events[self.previous_value_columns[column]]
|
|
event1_first_eligible = max(
|
|
# .ix doesn't work here because the frame index contains
|
|
# integers, so 0 is still interpreted as a key.
|
|
relevant_events.iloc[0].loc[['event_date', 'timestamp']],
|
|
)
|
|
event2_first_eligible = max(
|
|
relevant_events.iloc[1].loc[['event_date', 'timestamp']]
|
|
)
|
|
|
|
for date, computed_value in zip(dates, asset_result):
|
|
if date >= event2_first_eligible:
|
|
# If we've seen event 2, it should win even if we've seen
|
|
# event 1, because events are sorted by event_date.
|
|
self.assertEqual(computed_value, v2)
|
|
elif date >= event1_first_eligible:
|
|
# If we've seen event 1 but not event 2, event 1 should
|
|
# win.
|
|
self.assertEqual(computed_value, v1)
|
|
else:
|
|
# If we haven't seen either event, then we should have
|
|
# column.missing_value.
|
|
assert_equal(
|
|
computed_value,
|
|
column.missing_value,
|
|
# Coerce from Timestamp to datetime64.
|
|
allow_datetime_coercions=True,
|
|
)
|
|
|
|
def check_next_value_results(self, column, results, dates):
|
|
"""
|
|
Check results for a single column.
|
|
"""
|
|
self.assert_result_contains_all_sids(results)
|
|
|
|
events = self.raw_events_no_nulls
|
|
# Remove timezone info from trading days, since the outputs
|
|
# from pandas won't be tz_localized.
|
|
dates = dates.tz_localize(None)
|
|
for asset, asset_result in results.iteritems():
|
|
relevant_events = events[events.sid == asset.sid]
|
|
self.assertEqual(len(relevant_events), 2)
|
|
|
|
v1, v2 = relevant_events[self.next_value_columns[column]]
|
|
e1, e2 = relevant_events['event_date']
|
|
t1, t2 = relevant_events['timestamp']
|
|
|
|
for date, computed_value in zip(dates, asset_result):
|
|
if t1 <= date <= e1:
|
|
# If we've seen event 2, it should win even if we've seen
|
|
# event 1, because events are sorted by event_date.
|
|
self.assertEqual(computed_value, v1)
|
|
elif t2 <= date <= e2:
|
|
# If we've seen event 1 but not event 2, event 1 should
|
|
# win.
|
|
self.assertEqual(computed_value, v2)
|
|
else:
|
|
# If we haven't seen either event, then we should have
|
|
# column.missing_value.
|
|
assert_equal(
|
|
computed_value,
|
|
column.missing_value,
|
|
# Coerce from Timestamp to datetime64.
|
|
allow_datetime_coercions=True,
|
|
)
|
|
|
|
def test_wrong_cols(self):
|
|
# Test wrong cols (cols != expected)
|
|
events = pd.DataFrame({
|
|
'c': [5],
|
|
SID_FIELD_NAME: [1],
|
|
TS_FIELD_NAME: [pd.Timestamp('2014')],
|
|
EVENT_DATE_FIELD_NAME: [pd.Timestamp('2014')],
|
|
})
|
|
|
|
EventsLoader(events, {EventDataSet.next_float: 'c'}, {})
|
|
EventsLoader(events, {}, {EventDataSet.previous_float: 'c'})
|
|
|
|
with self.assertRaises(ValueError) as e:
|
|
EventsLoader(events, {EventDataSet.next_float: 'd'}, {})
|
|
|
|
msg = str(e.exception)
|
|
expected = (
|
|
"EventsLoader missing required columns ['d'].\n"
|
|
"Got Columns: ['c', 'event_date', 'sid', 'timestamp']\n"
|
|
"Expected Columns: ['d', 'event_date', 'sid', 'timestamp']"
|
|
)
|
|
self.assertEqual(msg, expected)
|
|
|
|
|
|
class BlazeEventsLoaderTestCase(EventsLoaderTestCase):
|
|
"""
|
|
Run the same tests as EventsLoaderTestCase, but using a BlazeEventsLoader.
|
|
"""
|
|
|
|
@classmethod
|
|
def make_loader(cls, events, next_value_columns, previous_value_columns):
|
|
return BlazeEventsLoader(
|
|
bz.data(events),
|
|
next_value_columns,
|
|
previous_value_columns,
|
|
)
|
|
|
|
|
|
class EventLoaderUtilsTestCase(ZiplineTestCase):
|
|
# These cases test the following:
|
|
# 1. Shuffling timestamps in DST/EST produces the correct normalized
|
|
# timestamps
|
|
# 2. Timestamps at query time boundaries are normalized correctly
|
|
boundary_dates = [pd.Timestamp('2013-01-04 8:44:59'),
|
|
pd.Timestamp('2013-01-04 8:45:00'),
|
|
pd.Timestamp('2013-01-04 8:46:00')]
|
|
us_boundary_dates = [date.tz_localize('US/Eastern') for date in
|
|
boundary_dates]
|
|
moscow_boundary_dates = [date.tz_localize('Europe/Moscow') for date in
|
|
boundary_dates]
|
|
mixed_tz_dates = [pd.Timestamp('2013-12-30'),
|
|
pd.Timestamp('2013-01-24'),
|
|
pd.Timestamp('2013-01-31 20:00:00'),
|
|
pd.Timestamp('2013-04-04'),
|
|
pd.Timestamp('2013-04-21'),
|
|
pd.Timestamp('2013-06-01')]
|
|
us_dates = pd.to_datetime(us_boundary_dates + mixed_tz_dates,
|
|
utc=True).tz_localize(None)
|
|
moscow_dates = pd.to_datetime(moscow_boundary_dates + mixed_tz_dates,
|
|
utc=True).tz_localize(None)
|
|
|
|
all_combos = list(map(np.array, itertools.permutations(np.arange(len(
|
|
boundary_dates + mixed_tz_dates)
|
|
))))
|
|
# len(permutations(7)) is about 5000, which makes this take too long.
|
|
# Sampling down to 50-ish permutations still gives is good coverage of the
|
|
# different interleavings.
|
|
combos = all_combos[::100]
|
|
|
|
expected_us = pd.Series(
|
|
[pd.Timestamp('2013-01-04'),
|
|
pd.Timestamp('2013-01-05'),
|
|
pd.Timestamp('2013-01-05'),
|
|
pd.Timestamp('2013-12-30'),
|
|
pd.Timestamp('2013-01-24'),
|
|
pd.Timestamp('2013-02-01'),
|
|
pd.Timestamp('2013-04-04'),
|
|
pd.Timestamp('2013-04-21'),
|
|
pd.Timestamp('2013-06-01')]
|
|
).values
|
|
|
|
# Russia's TZ offset is +4
|
|
expected_russia = pd.Series(
|
|
[pd.Timestamp('2013-01-04'),
|
|
pd.Timestamp('2013-01-05'),
|
|
pd.Timestamp('2013-01-05'),
|
|
pd.Timestamp('2013-12-30'),
|
|
pd.Timestamp('2013-01-24'),
|
|
pd.Timestamp('2013-01-31'),
|
|
pd.Timestamp('2013-04-04'),
|
|
pd.Timestamp('2013-04-21'),
|
|
pd.Timestamp('2013-06-01')]
|
|
).values
|
|
|
|
# Test with timezones on either side of the meridian
|
|
@parameterized.expand([(expected_us, 'US/Eastern', us_dates),
|
|
(expected_russia, 'Europe/Moscow', moscow_dates)])
|
|
def test_normalize_to_query_time(self, expected, tz, dates):
|
|
# Order matters in pandas 0.18.2. Prior to that, using tz_convert on
|
|
# a DatetimeIndex with DST/EST timestamps mixed resulted in some of
|
|
# them being an hour off (1 hour past midnight).
|
|
for scrambler in self.combos:
|
|
df = pd.DataFrame({"timestamp": dates[scrambler]})
|
|
result = normalize_timestamp_to_query_time(df,
|
|
time(8, 45),
|
|
tz,
|
|
inplace=False,
|
|
ts_field='timestamp')
|
|
|
|
timestamps = result['timestamp'].values
|
|
check_arrays(np.sort(timestamps), np.sort(expected[scrambler]))
|