diff --git a/dev/cli.py b/dev/cli.py new file mode 100644 index 00000000..3fd8a4b3 --- /dev/null +++ b/dev/cli.py @@ -0,0 +1 @@ +# TODO: move qexec console here diff --git a/logging.cfg b/logging.cfg new file mode 100644 index 00000000..911d2a2a --- /dev/null +++ b/logging.cfg @@ -0,0 +1,37 @@ +[loggers] +keys=root + +[handlers] +keys=consoleHandler,filesystemHandler + +[formatters] +keys=ziplineformat + +# ------- + +[logger_root] +level=DEBUG +handlers=consoleHandler,filesystemHandler +qualname=ZiplineLogger + +# ------- + +[handler_filesystemHandler] +class=handlers.RotatingFileHandler +level=DEBUG +formatter=ziplineformat +args=("/var/log/zipline/zipline.log",10*1024*1024,5) +propagate=1 + +[handler_consoleHandler] +class=StreamHandler +level=ERROR +formatter=ziplineformat +args=(sys.stdout,) +propagate=1 + +# ------- + +[formatter_ziplineformat] +format=%(asctime)s %(levelname)s %(filename)s %(funcName)s - %(message)s +datefmt=%Y-%m-%d %H:%M:%S %Z diff --git a/zipline/test/__init__.py b/tests/__init__.py similarity index 100% rename from zipline/test/__init__.py rename to tests/__init__.py diff --git a/zipline/test/client.py b/tests/client.py similarity index 86% rename from zipline/test/client.py rename to tests/client.py index 324f52a7..03874b95 100644 --- a/zipline/test/client.py +++ b/tests/client.py @@ -1,15 +1,16 @@ +import logging from gevent_zeromq import zmq -import zipline.util as qutil -import zipline.messaging as qmsg import zipline.protocol as zp +from zipline.core.component import Component from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE -from zipline.finance.trading import TradeSimulationClient -class TestClient(qmsg.Component): +LOGGER = logging.getLogger('ZiplineLogger') + +class TestClient(Component): def __init__(self): - qmsg.Component.__init__(self) + Component.__init__(self) self.init() def init(self): @@ -55,7 +56,7 @@ class TestClient(qmsg.Component): #logger.info('msg:' + str(msg)) if msg == str(CONTROL_PROTOCOL.DONE): - qutil.LOGGER.info("Client is DONE!") + LOGGER.info("Client is DONE!") self.signal_done() return @@ -79,7 +80,7 @@ class TestClient(qmsg.Component): self.prev_dt = event.dt if self.received_count % 100 == 0: - qutil.LOGGER.info("received {n} messages".format(n=self.received_count)) - + LOGGER.info("received {n} messages".format(n=self.received_count)) + def unframe(self, msg): return zp.MERGE_UNFRAME(msg) diff --git a/zipline/test/test_finance.py b/tests/test_finance.py similarity index 92% rename from zipline/test/test_finance.py rename to tests/test_finance.py index 0876e19c..b5ef0e4c 100644 --- a/zipline/test/test_finance.py +++ b/tests/test_finance.py @@ -1,5 +1,6 @@ -"""Tests for the zipline.finance package""" -import mock +""" +Tests for the zipline.finance package +""" import pytz from unittest2 import TestCase @@ -8,22 +9,16 @@ from collections import defaultdict from nose.tools import timed -import zipline.test.factory as factory -import zipline.util as qutil -import zipline.finance.risk as risk +import zipline.utils.factory as factory import zipline.protocol as zp -import zipline.finance.performance as perf -from zipline.test.algorithms import TestAlgorithm -from zipline.sources import SpecificEquityTrades -from zipline.finance.trading import TransactionSimulator, \ -TradeSimulationClient, TradingEnvironment -from zipline.simulator import AddressAllocator, Simulator -from zipline.monitor import Controller +from zipline.test_algorithms import TestAlgorithm +from zipline.finance.trading import TradingEnvironment +from zipline.core.devsimulator import AddressAllocator from zipline.lines import SimulatedTrading from zipline.finance.performance import PerformanceTracker -from zipline.protocol_utils import namedict -from zipline.finance.trading import SIMULATION_STYLE +from zipline.utils.protocol_utils import ndict +from zipline.finance.trading import TransactionSimulator, SIMULATION_STYLE DEFAULT_TIMEOUT = 15 # seconds EXTENDED_TIMEOUT = 90 @@ -35,7 +30,7 @@ class FinanceTestCase(TestCase): leased_sockets = defaultdict(list) def setUp(self): - qutil.configure_logging() + #qutil.configure_logging() self.zipline_test_config = { 'allocator':allocator, 'sid':133 @@ -148,7 +143,7 @@ class FinanceTestCase(TestCase): # TODO: for some reason the orders aren't filled without an extra # trade. - trade_count = 5001 + trade_count = 5 self.zipline_test_config['order_count'] = trade_count - 1 self.zipline_test_config['trade_count'] = trade_count self.zipline_test_config['order_amount'] = 1 @@ -156,7 +151,7 @@ class FinanceTestCase(TestCase): # tell the simulator to fill the orders in individual transactions # matching the order volume exactly. self.zipline_test_config['simulation_style'] = \ - SIMULATION_STYLE.FIXED_SLIPPAGE + SIMULATION_STYLE.FIXED_SLIPPAGE self.zipline_test_config['environment'] = factory.create_trading_environment() sid_list = [self.zipline_test_config['sid']] @@ -416,7 +411,7 @@ class FinanceTestCase(TestCase): alternate = params.get('alternate') # if present, expect transaction amounts to match orders exactly. complete_fill = params.get('complete_fill') - + trading_environment = factory.create_trading_environment() trade_sim = TransactionSimulator() price = [10.1] * trade_count @@ -424,22 +419,22 @@ class FinanceTestCase(TestCase): start_date = trading_environment.first_open sid = 1 - generated_trades = factory.create_trade_history( - sid, - price, - volume, - trade_interval, - trading_environment + generated_trades = factory.create_trade_history( + sid, + price, + volume, + trade_interval, + trading_environment ) - + if alternate: alternator = -1 else: alternator = 1 - + order_date = start_date for i in xrange(order_count): - order = namedict( + order = ndict( { 'sid' : sid, 'amount' : order_amount * alternator**i, @@ -448,7 +443,7 @@ class FinanceTestCase(TestCase): }) trade_sim.add_open_order(order) - + order_date = order_date + order_interval # move after market orders to just after market next # market open. @@ -456,40 +451,40 @@ class FinanceTestCase(TestCase): if order_date.minute >= 00: order_date = order_date + timedelta(days=1) order_date = order_date.replace(hour=14, minute=30) - + # there should now be one open order list stored under the sid oo = trade_sim.open_orders self.assertEqual(len(oo), 1) self.assertTrue(oo.has_key(sid)) order_list = oo[sid] self.assertEqual(order_count, len(order_list)) - + for i in xrange(order_count): order = order_list[i] self.assertEqual(order.sid, sid) self.assertEqual(order.amount, order_amount * alternator**i) - - + + tracker = PerformanceTracker(trading_environment) - + # this approximates the loop inside TradingSimulationClient transactions = [] for trade in generated_trades: if trade_delay: trade.dt = trade.dt + trade_delay - + txn = trade_sim.apply_trade_to_open_orders(trade) if txn: - transactions.append(txn) - trade.TRANSACTION = txn + transactions.append(txn) + trade.TRANSACTION = txn else: trade.TRANSACTION = None - - tracker.process_event(trade) - + + tracker.process_event(trade) + if complete_fill: - self.assertEqual(len(transactions), len(order_list)) - + self.assertEqual(len(transactions), len(order_list)) + total_volume = 0 for i in xrange(len(transactions)): txn = transactions[i] @@ -497,18 +492,18 @@ class FinanceTestCase(TestCase): if complete_fill: order = order_list[i] self.assertEqual(order.amount, txn.amount) - - self.assertEqual(total_volume, expected_txn_volume) + + self.assertEqual(total_volume, expected_txn_volume) self.assertEqual(len(transactions), expected_txn_count) - + cumulative_pos = tracker.cumulative_performance.positions[sid] self.assertEqual(total_volume, cumulative_pos.amount) - + # the open orders should now be empty oo = trade_sim.open_orders self.assertTrue(oo.has_key(sid)) order_list = oo[sid] self.assertEqual(0, len(order_list)) - - - \ No newline at end of file + + + diff --git a/zipline/test/test_ndict.py b/tests/test_ndict.py similarity index 83% rename from zipline/test/test_ndict.py rename to tests/test_ndict.py index e2cb8a84..63f1f4df 100644 --- a/zipline/test/test_ndict.py +++ b/tests/test_ndict.py @@ -1,4 +1,4 @@ -from zipline.protocol_utils import ndict, namedict +from zipline.utils.protocol_utils import ndict def test_ndict(): nd = ndict({}) @@ -21,11 +21,18 @@ def test_ndict(): assert 'x' in nd assert 'y' not in nd + # Mutability + nd2 = ndict({'x': 1}) + assert nd2.x == 1 + nd2.x = 2 + assert nd2.x == 2 + # Class isolation assert '__init__' not in nd assert '__iter__' not in nd assert not nd.__dict__.has_key('x') assert nd.get('__init__') is None + assert 'x' not in set(dir(nd)) # Comparison nd2 = nd.copy() diff --git a/zipline/test/test_perf_tracking.py b/tests/test_perf_tracking.py similarity index 98% rename from zipline/test/test_perf_tracking.py rename to tests/test_perf_tracking.py index e952fed4..e9f5b4d2 100644 --- a/zipline/test/test_perf_tracking.py +++ b/tests/test_perf_tracking.py @@ -4,18 +4,19 @@ import random import datetime import pytz -import zipline.test.factory as factory -import zipline.test.algorithms -import zipline.util as qutil +import zipline.utils.factory as factory +import zipline.test_algorithms +#import zipline.util as qutil import zipline.finance.performance as perf import zipline.finance.risk as risk import zipline.protocol as zp from zipline.finance.trading import TradeSimulationClient, TradingEnvironment, \ -SIMULATION_STYLE + SIMULATION_STYLE + class PerformanceTestCase(unittest.TestCase): def setUp(self): - qutil.configure_logging() + #qutil.configure_logging() self.benchmark_returns, self.treasury_curves = \ factory.load_market_data() @@ -546,7 +547,7 @@ shares in position" #create a transaction for all but #first trade in each sid, to simulate None transaction if(event.dt != self.trading_environment.period_start): - txn = zp.namedict({ + txn = zp.ndict({ 'sid' : event.sid, 'amount' : -25, 'dt' : event.dt, @@ -565,4 +566,4 @@ shares in position" cumulative_pos = perf_tracker.cumulative_performance.positions[sid] expected_size = txn_count / 2 * -25 self.assertEqual(cumulative_pos.amount, expected_size) - \ No newline at end of file + diff --git a/zipline/test/test_protocol.py b/tests/test_protocol.py similarity index 91% rename from zipline/test/test_protocol.py rename to tests/test_protocol.py index c0d4a7c7..d26c2feb 100644 --- a/zipline/test/test_protocol.py +++ b/tests/test_protocol.py @@ -9,11 +9,11 @@ from collections import defaultdict from nose.tools import timed -import zipline.test.factory as factory -import zipline.util as qutil +import zipline.utils.factory as factory +from zipline.utils import logger import zipline.protocol as zp -from zipline.sources import SpecificEquityTrades +from zipline.finance.sources import SpecificEquityTrades DEFAULT_TIMEOUT = 5 # seconds @@ -22,7 +22,7 @@ class ProtocolTestCase(TestCase): leased_sockets = defaultdict(list) def setUp(self): - qutil.configure_logging() + #qutil.configure_logging() self.trading_environment = factory.create_trading_environment() @timed(DEFAULT_TIMEOUT) @@ -45,7 +45,7 @@ class ProtocolTestCase(TestCase): for trade in trades: #simulate data source sending frame - msg = zp.DATASOURCE_FRAME(zp.namedict(trade)) + msg = zp.DATASOURCE_FRAME(zp.ndict(trade)) #feed unpacking frame recovered_trade = zp.DATASOURCE_UNFRAME(msg) #feed sending frame @@ -74,13 +74,13 @@ class ProtocolTestCase(TestCase): self.assertTrue(event.helloworld == 2345.6) event.delete('helloworld') - self.assertEqual(zp.namedict(trade), event) + self.assertEqual(zp.ndict(trade), event) @timed(DEFAULT_TIMEOUT) def test_order_protocol(self): #client places an order now = datetime.utcnow().replace(tzinfo=pytz.utc) - order = zp.namedict({ + order = zp.ndict({ 'dt':now, 'sid':133, 'amount':100 @@ -94,7 +94,7 @@ class ProtocolTestCase(TestCase): self.assertEqual(order.dt, now) #order datasource datasource frames the order - order_event = zp.namedict({ + order_event = zp.ndict({ "sid" : order.sid, "amount" : order.amount, "dt" : order.dt, @@ -111,7 +111,7 @@ class ProtocolTestCase(TestCase): self.assertEqual(now, recovered_order.dt) #create a transaction from the order - txn = zp.namedict({ + txn = zp.ndict({ 'sid' : recovered_order.sid, 'amount' : recovered_order.amount, 'dt' : recovered_order.dt, diff --git a/zipline/test/test_risk.py b/tests/test_risk.py similarity index 99% rename from zipline/test/test_risk.py rename to tests/test_risk.py index 25685143..21b5785b 100644 --- a/zipline/test/test_risk.py +++ b/tests/test_risk.py @@ -4,15 +4,14 @@ import datetime import calendar import pytz import zipline.finance.risk as risk -import zipline.test.factory as factory -import zipline.util as qutil +from zipline.utils import factory from zipline.finance.trading import TradingEnvironment class Risk(unittest.TestCase): def setUp(self): - qutil.configure_logging() + #qutil.configure_logging() start_date = datetime.datetime( year=2006, month=1, @@ -354,4 +353,4 @@ RETURNS = [ 0.048 , -0.0307, -0.0357, 0.0033, -0.0412, -0.0407, 0.0455, 0.0159, -0.0051, -0.0274, -0.0213, 0.0361, 0.0051, -0.0378, 0.0084, 0.0066, -0.0103, -0.0037, 0.0478, -0.0278 -] \ No newline at end of file +] diff --git a/zipline/test/test_sanity.py b/tests/test_sanity.py similarity index 100% rename from zipline/test/test_sanity.py rename to tests/test_sanity.py diff --git a/zipline/__init__.py b/zipline/__init__.py index f47dbf47..23bcca40 100644 --- a/zipline/__init__.py +++ b/zipline/__init__.py @@ -1,3 +1,20 @@ """ -QSim provides asynchronous simulation of historic data streams, simulated trade execution, and data stream transformations. -""" \ No newline at end of file +Zipline +""" + +# This is *not* a place to dump arbitrary classes/modules for convenience, +# it is a place to expose the public interfaces. + +import protocol # namespace +from core.monitor import Controller +from lines import SimulatedTrading +from core.host import ComponentHost +from utils.protocol_utils import ndict + +__all__ = [ + SimulatedTrading, + Controller, + ComponentHost, + protocol, + ndict +] diff --git a/zipline/components/__init__.py b/zipline/components/__init__.py new file mode 100644 index 00000000..b845f2db --- /dev/null +++ b/zipline/components/__init__.py @@ -0,0 +1,11 @@ +from feed import Feed +from merge import Merge +from passthrough import PassthroughTransform +from datasource import DataSource + +__all__ = [ + Feed, + Merge, + PassthroughTransform, + DataSource, +] diff --git a/zipline/components/datasource.py b/zipline/components/datasource.py new file mode 100644 index 00000000..8c14022b --- /dev/null +++ b/zipline/components/datasource.py @@ -0,0 +1,66 @@ +""" +Commonly used messaging components. +""" + +import logging + +import zipline.protocol as zp +from zipline.core.component import Component +from zipline.protocol import COMPONENT_TYPE + +LOGGER = logging.getLogger('ZiplineLogger') + +class DataSource(Component): + """ + Baseclass for data sources. Subclass and implement send_all - usually this + means looping through all records in a store, converting to a dict, and + calling send(map). + + Every datasource has a dict property to hold filters:: + - key -- name of the filter, e.g. SID + - value -- a primitive representing the filter. e.g. a list of ints. + + Modify the datasource's filters via the set_filter(name, value) + """ + def __init__(self, source_id): + Component.__init__(self) + + self.id = source_id + self.init() + self.filter = {} + + def init(self): + self.cur_event = None + + def set_filter(self, name, value): + self.filter[name] = value + + @property + def get_id(self): + return self.id + + @property + def get_type(self): + return COMPONENT_TYPE.SOURCE + + def open(self): + self.data_socket = self.connect_data() + + def send(self, event): + """ + Emit data. + """ + assert isinstance(event, zp.ndict) + + event['source_id'] = self.get_id + event['type'] = self.get_type + + try: + ds_frame = self.frame(event) + except zp.INVALID_DATASOURCE_FRAME as exc: + return self.signal_exception(exc) + + self.data_socket.send(ds_frame) + + def frame(self, event): + return zp.DATASOURCE_FRAME(event) diff --git a/zipline/components/feed.py b/zipline/components/feed.py new file mode 100644 index 00000000..bff79e79 --- /dev/null +++ b/zipline/components/feed.py @@ -0,0 +1,209 @@ +import logging +from collections import Counter + +from zipline.core.component import Component +import zipline.protocol as zp + +from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \ + CONTROL_FRAME, CONTROL_UNFRAME + +LOGGER = logging.getLogger('ZiplineLogger') + +class Feed(Component): + """ + Connects to N PULL sockets, publishing all messages received to a PUB + socket. Published messages are guaranteed to be in chronological order + based on message property dt. Expects to be instantiated in one execution + context (thread, process, etc) and run in another. + """ + + def __init__(self): + Component.__init__(self) + + self.sent_count = 0 + self.received_count = 0 + self.draining = False + self.ds_finished_counter = 0 + + # Depending on the size of this, might want to use a data + # structure with better asymptotics. + self.data_buffer = {} + + # source_id -> integer count + self.sent_counters = Counter() + self.recv_counters = Counter() + + def init(self): + pass + + @property + def get_id(self): + return "FEED" + + @property + def get_type(self): + return COMPONENT_TYPE.CONDUIT + + # ------------- + # Core Methods + # ------------- + + def open(self): + self.pull_socket = self.bind_data() + self.feed_socket = self.bind_feed() + + def do_work(self): + # wait for synchronization reply from the host + socks = dict(self.poll.poll(self.heartbeat_timeout)) + + # TODO: Abstract this out, maybe on base component + if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN: + msg = self.control_in.recv() + event, payload = CONTROL_UNFRAME(msg) + + # -- Heartbeat -- + if event == CONTROL_PROTOCOL.HEARTBEAT: + # Heart outgoing + heartbeat_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.OK, + payload + ) + self.control_out.send(heartbeat_frame) + + # -- Soft Kill -- + elif event == CONTROL_PROTOCOL.SHUTDOWN: + self.signal_done() + self.shutdown() + + # -- Hard Kill -- + elif event == CONTROL_PROTOCOL.KILL: + self.kill() + + + if self.pull_socket in socks and socks[self.pull_socket] == self.zmq.POLLIN: + message = self.pull_socket.recv() + + if message == str(CONTROL_PROTOCOL.DONE): + self.ds_finished_counter += 1 + + if len(self.data_buffer) == self.ds_finished_counter: + #drain any remaining messages in the buffer + LOGGER.debug("draining feed") + self.drain() + self.signal_done() + else: + try: + event = self.unframe(message) + # deserialization error + except zp.INVALID_DATASOURCE_FRAME as exc: + return self.signal_exception(exc) + + try: + self.append(event) + self.send_next() + + # Invalid message + except zp.INVALID_DATASOURCE_FRAME as exc: + return self.signal_exception(exc) + + def unframe(self, msg): + return zp.DATASOURCE_UNFRAME(msg) + + def frame(self, event): + return zp.FEED_FRAME(event) + + # ------------- + # Flow Control + # ------------- + + def drain(self): + """ + Send all messages in the buffer. + """ + self.draining = True + while self.pending_messages() > 0: + self.send_next() + + def send_next(self): + """ + Send the (chronologically) next message in the buffer. + """ + if not (self.is_full() or self.draining): + return + + event = self.next() + if(event != None): + self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK) + self.sent_counters[event.source_id] += 1 + self.sent_count += 1 + + def append(self, event): + """ + Add an event to the buffer for the source specified by + source_id. + """ + self.data_buffer[event.source_id].append(event) + self.recv_counters[event.source_id] += 1 + self.received_count += 1 + + def next(self): + """ + Get the next message in chronological order. + """ + if not(self.is_full() or self.draining): + return + + cur_source = None + earliest_source = None + earliest_event = None + #iterate over the queues of events from all sources + #(1 queue per datasource) + for events in self.data_buffer.values(): + if len(events) == 0: + continue + cur_source = events + first_in_list = events[0] + if first_in_list.dt == None: + #this is a filler event, discard + events.pop(0) + continue + + if (earliest_event == None) or (first_in_list.dt <= earliest_event.dt): + earliest_event = first_in_list + earliest_source = cur_source + + if earliest_event != None: + return earliest_source.pop(0) + + def is_full(self): + """ + Indicates whether the buffer has messages in buffer for + all un-DONE, blocking sources. + """ + for source_id, events in self.data_buffer.iteritems(): + if len(events) == 0: + return False + return True + + def pending_messages(self): + """ + Returns the count of all events from all sources in the + buffer. + """ + total = 0 + for events in self.data_buffer.values(): + total += len(events) + return total + + def add_source(self, source_id): + """ + Add a data source to the buffer. + """ + self.data_buffer[source_id] = [] + + def __len__(self): + """ + Buffer's length is same as internal map holding separate + sorted arrays of events keyed by source id. + """ + return len(self.data_buffer) diff --git a/zipline/components/merge.py b/zipline/components/merge.py new file mode 100644 index 00000000..83694311 --- /dev/null +++ b/zipline/components/merge.py @@ -0,0 +1,68 @@ +from feed import Feed + +import zipline.protocol as zp +from zipline.protocol import COMPONENT_TYPE + +# TODO: By Liskov merge must *be* a feed, don't believe this is +# the case. + +class Merge(Feed): + """ + Merges multiple streams of events into single messages. + """ + + def __init__(self): + Feed.__init__(self) + + self.init() + + def init(self): + pass + + @property + def get_id(self): + return "MERGE" + + @property + def get_type(self): + return COMPONENT_TYPE.CONDUIT + + def open(self): + self.pull_socket = self.bind_merge() + self.feed_socket = self.bind_result() + + def next(self): + """Get the next merged message from the feed buffer.""" + if not (self.is_full() or self.draining): + return + + if self.pending_messages() == 0: + return + + #get the raw event from the passthrough transform. + result = self.data_buffer[zp.TRANSFORM_TYPE.PASSTHROUGH].pop(0).PASSTHROUGH + for source, events in self.data_buffer.iteritems(): + if source == zp.TRANSFORM_TYPE.PASSTHROUGH: + continue + if len(events) > 0: + cur = events.pop(0) + result.merge(cur) + return result + + def unframe(self, msg): + return zp.TRANSFORM_UNFRAME(msg) + + def frame(self, event): + return zp.MERGE_FRAME(event) + + def append(self, event): + """ + :param event: a ndict with one entry. key is the name of the + transform, value is the transformed value. + Add an event to the buffer for the source specified by + source_id. + """ + + self.data_buffer[event.keys()[0]].append(event) + self.received_count += 1 + diff --git a/zipline/components/passthrough.py b/zipline/components/passthrough.py new file mode 100644 index 00000000..e7fa5d52 --- /dev/null +++ b/zipline/components/passthrough.py @@ -0,0 +1,35 @@ +import zipline.protocol as zp +from zipline.transforms import BaseTransform + +from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \ + COMPONENT_STATE, CONTROL_FRAME, CONTROL_UNFRAME + +class PassthroughTransform(BaseTransform): + """ + A bypass transform which is also an identity transform:: + + +-------+ + +---| f |---> + +-------+ + +------id-------> + + """ + + def __init__(self, **kwargs): + BaseTransform.__init__(self, "PASSTHROUGH") + self.init(**kwargs) + + def init(self, **kwargs): + pass + + @property + def get_type(self): + return COMPONENT_TYPE.CONDUIT + + #TODO, could save some cycles by skipping the _UNFRAME call + # and just setting value to original msg string. + def transform(self, event): + return { + 'name' : zp.TRANSFORM_TYPE.PASSTHROUGH, + 'value' : zp.FEED_FRAME(event) + } diff --git a/zipline/core/__init__.py b/zipline/core/__init__.py new file mode 100644 index 00000000..d487dd05 --- /dev/null +++ b/zipline/core/__init__.py @@ -0,0 +1,9 @@ +from host import ComponentHost +from component import Component +from monitor import Controller + +__all__ = [ + Component, + Controller, + ComponentHost +] diff --git a/zipline/component.py b/zipline/core/component.py similarity index 97% rename from zipline/component.py rename to zipline/core/component.py index f522e946..479da548 100644 --- a/zipline/component.py +++ b/zipline/core/component.py @@ -1,6 +1,4 @@ """ -Commonly used messaging components. - Contains the base class for all components. """ @@ -9,7 +7,7 @@ import sys import uuid import time import socket -import gevent +import logging import traceback import humanhash @@ -20,13 +18,11 @@ import gevent_zeromq # zmq_ctypes #import zmq_ctypes -from datetime import datetime - -import zipline.util as qutil -from zipline.gpoll import _Poller as GeventPoller +from zipline.utils.gpoll import _Poller as GeventPoller from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE, \ - COMPONENT_FAILURE, BACKTEST_STATE, CONTROL_FRAME + COMPONENT_FAILURE, CONTROL_FRAME +LOGGER = logging.getLogger('ZiplineLogger') class Component(object): """ @@ -315,7 +311,7 @@ class Component(object): ) self.control_out.send(exception_frame) - qutil.LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id)) + LOGGER.exception("Unexpected error in run for {id}.".format(id=self.get_id)) def signal_done(self): """ @@ -342,7 +338,7 @@ class Component(object): #notify internal work look that we're done self.done = True # TODO: use state flag - qutil.LOGGER.info("[%s] DONE" % self.get_id) + LOGGER.info("[%s] DONE" % self.get_id) # ----------- # Messaging @@ -462,7 +458,7 @@ class Component(object): DEPRECATED, left in for compatability for now. """ - qutil.LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id)) + LOGGER.debug("Connecting sync client for {id}".format(id=self.get_id)) self.sync_socket = self.context.socket(self.zmq.REQ) self.sync_socket.connect(self.addresses['sync_address']) diff --git a/zipline/simulator.py b/zipline/core/devsimulator.py similarity index 91% rename from zipline/simulator.py rename to zipline/core/devsimulator.py index 728bca3d..37418d06 100644 --- a/zipline/simulator.py +++ b/zipline/core/devsimulator.py @@ -3,11 +3,7 @@ Simulator hosts all the components necessary to execute a simluation. See :py:me """ import threading -import mock -from collections import defaultdict -from zipline.monitor import Controller -from zipline.messaging import ComponentHost -import zipline.util as qutil +from zipline.core import ComponentHost class AddressAllocator(object): @@ -35,7 +31,7 @@ class Simulator(ComponentHost): ComponentHost.__init__(self, addresses) self.subthreads = [] self.running = False - + @property def get_id(self): return 'Simple Simulator' diff --git a/zipline/core/host.py b/zipline/core/host.py new file mode 100644 index 00000000..250daf96 --- /dev/null +++ b/zipline/core/host.py @@ -0,0 +1,164 @@ +import logging +import datetime + +from component import Component + +from zipline.transforms import BaseTransform +from zipline.components import Feed, Merge, PassthroughTransform, \ + DataSource +from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_STATE + +LOGGER = logging.getLogger('ZiplineLogger') + +class ComponentHost(Component): + """ + Components that can launch multiple sub-components, synchronize their + start, and then wait for all components to be finished. + """ + + def __init__(self, addresses): + Component.__init__(self) + self.addresses = addresses + self.running = False + + self.init() + + def init(self): + assert hasattr(self, 'zmq_flavor'), """ + You must specify a flavor of ZeroMQ for all + ComponentHost subclasses. """ + + # Component Registry, keyed by get_id + # ---------------------- + self.components = {} + # ---------------------- + # Internal Registry, keyed by guid + self._components = {} + # ---------------------- + + self.sync_register = {} + self.timeout = datetime.timedelta(seconds=60) + + self.feed = Feed() + self.merge = Merge() + self.passthrough = PassthroughTransform() + self.controller = None + + #register the feed and the merge + self.register_components([self.feed, self.merge, self.passthrough]) + + def register_controller(self, controller): + """ + Add the given components to the registry. Establish + communication with them. + """ + if self.controller != None: + raise Exception("There can be only one!") + + self.controller = controller + self.controller.zmq_flavor = self.zmq_flavor + + # Propogate the controller to all the subcomponents + for component in self.components.itervalues(): + component.controller = controller + + def register_components(self, components): + """ + Add the given components to the registry. Establish + communication with them. + """ + assert isinstance(components, list) + for component in components: + + component.addresses = self.addresses + component.controller = self.controller + + # Hosts share their zmq flavor with hosted components + component.zmq_flavor = self.zmq_flavor + + self._components[component.guid] = component + self.components[component.get_id] = component + self.sync_register[component.get_id] = datetime.datetime.utcnow() + + if isinstance(component, DataSource): + self.feed.add_source(component.get_id) + if isinstance(component, BaseTransform): + self.merge.add_source(component.get_id) + + def unregister_component(self, component_id): + del self.components[component_id] + del self.sync_register[component_id] + + def setup_sync(self): + """ + Setup the sync socket and poller. ( Bind ) + """ + LOGGER.debug("Connecting sync server.") + + self.sync_socket = self.context.socket(self.zmq.REP) + self.sync_socket.bind(self.addresses['sync_address']) + + self.sync_poller = self.zmq_poller() + self.sync_poller.register(self.sync_socket, self.zmq.POLLIN) + + self.sockets.append(self.sync_socket) + + def open(self): + for component in self.components.values(): + self.launch_component(component) + self.launch_controller() + + def is_running(self): + """ + DEPRECATED, left in for compatability for now. + """ + + cur_time = datetime.datetime.utcnow() + + if len(self.components) == 0: + LOGGER.info("Component register is empty.") + return False + + return True + + def loop(self, lockstep=True): + + while self.is_running(): + # wait for synchronization request at start, and DONE at end. + # don't timeout. + socks = dict(self.sync_poller.poll()) + + if self.sync_socket in socks and socks[self.sync_socket] == self.zmq.POLLIN: + msg = self.sync_socket.recv() + + try: + parts = msg.split(':') + sync_id, status = parts + except ValueError as exc: + self.signal_exception(exc) + + if status == str(CONTROL_PROTOCOL.DONE): # TODO: other way around + LOGGER.debug("{id} is DONE".format(id=sync_id)) + self.unregister_component(sync_id) + self.state_flag = COMPONENT_STATE.DONE + else: + self.sync_register[sync_id] = datetime.datetime.utcnow() + + #qutil.LOGGER.info("confirmed {id}".format(id=msg)) + # send synchronization reply + self.sync_socket.send('ack', self.zmq.NOBLOCK) + + # ------------------ + # Simulation Control + # ------------------ + + def launch_controller(self, controller): + raise NotImplementedError + + def launch_component(self, component): + raise NotImplementedError + + def teardown_component(self, component): + raise NotImplementedError + + diff --git a/zipline/monitor.py b/zipline/core/monitor.py similarity index 96% rename from zipline/monitor.py rename to zipline/core/monitor.py index 6f72989b..5b2384f4 100644 --- a/zipline/monitor.py +++ b/zipline/core/monitor.py @@ -1,18 +1,18 @@ +import zmq import time import gevent import itertools -# pyzmq -import zmq +import logging import gevent_zeromq from collections import OrderedDict -from protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \ +from zipline.protocol import CONTROL_PROTOCOL, CONTROL_FRAME, \ CONTROL_UNFRAME, CONTROL_STATES, INVALID_CONTROL_FRAME \ states = CONTROL_STATES -from gpoll import _Poller as GeventPoller +from zipline.utils.gpoll import _Poller as GeventPoller # Roll Call ( Discovery ) # ----------------------- @@ -159,7 +159,7 @@ class Controller(object): debug = False period = 1 - def __init__(self, pub_socket, route_socket, logging = None): + def __init__(self, pub_socket, route_socket, logger = None): self.context = None self.zmq = None @@ -182,11 +182,11 @@ class Controller(object): self.error_replay = OrderedDict() - if logging: - self.logging = logging + if logger: + self.logging = logger else: - import util as qutil - self.logging = qutil.LOGGER + default_logger = logging.getLogger('ZiplineLogger') + self.logging = default_logger def init_zmq(self, flavor): @@ -355,7 +355,7 @@ class Controller(object): if tic - self.ctime > self.period: break - if self.router in socks and socks[self.router] == self.zmq.POLLIN: + if socks.get(self.router) == self.zmq.POLLIN: rawmessage = self.router.recv() if rawmessage: @@ -369,9 +369,10 @@ class Controller(object): self.logging.error('Invalid frame', rawmessage) pass - if self.cancel in socks and socks[self.cancel] == self.zmq.POLLIN: + if socks.get(self.cancel) == self.zmq.POLLIN: self.logging.info('[Controller] Received Cancellation') rawmessage = self.cancel.recv() + self.cancel.send('') self.shutdown(soft=True) break diff --git a/zipline/finance/movingaverage.py b/zipline/finance/movingaverage.py index 329b631e..349a6638 100644 --- a/zipline/finance/movingaverage.py +++ b/zipline/finance/movingaverage.py @@ -1,7 +1,7 @@ from datetime import timedelta from collections import defaultdict -from zipline.messaging import BaseTransform +from zipline.transforms.base import BaseTransform class MovingAverageTransform(BaseTransform): diff --git a/zipline/finance/performance.py b/zipline/finance/performance.py index 02d018b6..6bf70058 100644 --- a/zipline/finance/performance.py +++ b/zipline/finance/performance.py @@ -40,8 +40,8 @@ Performance Tracking | | through all the events delivered to this tracker. | | | For details look at the comments for | | | :py:meth:`zipline.finance.risk.RiskMetrics.to_dict`| - +-----------------+----------------------------------------------------+ - | exceeded_max_ | True if the simulation was stopped because single | + +-----------------+----------------------------------------------------+ + | exceeded_max_ | True if the simulation was stopped because single | | loss | day losses exceeded the max_drawdown stipulated in | | | trading_environment. | +-----------------+----------------------------------------------------+ @@ -110,6 +110,8 @@ Performance Period """ + +import logging import datetime import pytz import msgpack @@ -118,10 +120,11 @@ import math import zmq -import zipline.util as qutil import zipline.protocol as zp import zipline.finance.risk as risk +LOGGER = logging.getLogger('ZiplineLogger') + class PerformanceTracker(): """ Tracks the performance of the zipline as it is running in @@ -188,7 +191,7 @@ class PerformanceTracker(): ) def get_portfolio(self): - return self.cumulative_performance.to_namedict() + return self.cumulative_performance.to_ndict() def publish_to(self, zmq_socket, context=None): """ @@ -228,7 +231,7 @@ class PerformanceTracker(): if self.exceeded_max_loss: return - assert isinstance(event, zp.namedict) + assert isinstance(event, zp.ndict) self.event_count += 1 if(event.dt >= self.market_close): @@ -280,8 +283,8 @@ class PerformanceTracker(): returns = self.todays_performance.returns max_dd = -1 * self.trading_environment.max_drawdown if returns < max_dd: - qutil.LOGGER.info(str(returns) + " broke through " + str(max_dd)) - qutil.LOGGER.info("Exceeded max drawdown.") + LOGGER.info(str(returns) + " broke through " + str(max_dd)) + LOGGER.info("Exceeded max drawdown.") # mark the perf period with max loss flag, # so it shows up in the update, but don't end the test # here. Let the update go out before stopping @@ -316,8 +319,8 @@ class PerformanceTracker(): """ log_msg = "Simulated {n} trading days out of {m}." - qutil.LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days)) - qutil.LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open)) + LOGGER.info(log_msg.format(n=self.day_count, m=self.total_days)) + LOGGER.info("first open: {d}".format(d=self.trading_environment.first_open)) # the stream will end on the last trading day, but will not trigger # an end of day, so we trigger the final market close here. @@ -332,7 +335,7 @@ class PerformanceTracker(): ) if self.result_stream: - qutil.LOGGER.info("about to stream the risk report...") + LOGGER.info("about to stream the risk report...") risk_dict = self.risk_report.to_dict() msg = zp.RISK_FRAME(risk_dict) @@ -518,18 +521,18 @@ class PerformancePeriod(): return rval - def to_namedict(self): + def to_ndict(self): """ - Creates a namedict representing the state of this perfomance period. + Creates a ndict representing the state of this perfomance period. Properties are the same as the results of to_dict. See header comments for a detailed description. """ - positions = self.get_positions(namedicted=True) + positions = self.get_positions(ndicted=True) - positions = zp.namedict(positions) + positions = zp.ndict(positions) - return zp.namedict({ + return zp.ndict({ 'ending_value' : self.ending_value, 'capital_used' : self.period_capital_used, 'starting_value' : self.starting_value, @@ -542,12 +545,12 @@ class PerformancePeriod(): 'transactions' : self.processed_transactions }) - def get_positions(self, namedicted=False): + def get_positions(self, ndicted=False): positions = {} for sid, pos in self.positions.iteritems(): cur = pos.to_dict() - if namedicted: - positions[sid] = zp.namedict(cur) + if ndicted: + positions[sid] = zp.ndict(cur) else: positions[sid] = cur diff --git a/zipline/finance/returns.py b/zipline/finance/returns.py index e8d3ce34..5e031f15 100644 --- a/zipline/finance/returns.py +++ b/zipline/finance/returns.py @@ -1,8 +1,5 @@ -import pandas -from datetime import timedelta from collections import defaultdict - -from zipline.messaging import BaseTransform +from zipline.transforms.base import BaseTransform class ReturnsTransform(BaseTransform): diff --git a/zipline/finance/risk.py b/zipline/finance/risk.py index 248e75a7..c68324bd 100644 --- a/zipline/finance/risk.py +++ b/zipline/finance/risk.py @@ -36,14 +36,15 @@ Risk Report """ +import logging import datetime import math import pytz import numpy as np import numpy.linalg as la -import zipline.util as qutil import zipline.protocol as zp +LOGGER = logging.getLogger('ZiplineLogger') def advance_by_months(dt, jump_in_months): month = dt.month + jump_in_months @@ -243,7 +244,7 @@ class RiskMetrics(): cur_return = math.log(1.0 + r) + cur_return #this is a guard for a single day returning -100% except ValueError: - qutil.LOGGER.warn("{cur} return, zeroing the returns".format(cur=cur_return)) + LOGGER.warn("{cur} return, zeroing the returns".format(cur=cur_return)) cur_return = 0.0 compounded_returns.append(cur_return) diff --git a/zipline/sources.py b/zipline/finance/sources.py similarity index 91% rename from zipline/sources.py rename to zipline/finance/sources.py index 4c63978a..6f7b3e55 100644 --- a/zipline/sources.py +++ b/zipline/finance/sources.py @@ -6,11 +6,12 @@ import random import pytz from mock import Mock -import zipline.messaging as zm +from zipline.components import DataSource +from zipline.utils import ndict + import zipline.protocol as zp - -class TradeDataSource(zm.DataSource): +class TradeDataSource(DataSource): def send(self, event): """ @@ -24,7 +25,7 @@ class TradeDataSource(zm.DataSource): if event.sid in self.filter['SID']: message = zp.DATASOURCE_FRAME(event) else: - blank = zp.namedict({ + blank = ndict({ "type" : zp.DATASOURCE_TYPE.TRADE, "source_id" : self.get_id }) @@ -39,7 +40,7 @@ class RandomEquityTrades(TradeDataSource): """ def __init__(self, sid, source_id, count): - zm.DataSource.__init__(self, source_id) + DataSource.__init__(self, source_id) self.count = count self.incr = 0 self.sid = sid @@ -59,7 +60,7 @@ class RandomEquityTrades(TradeDataSource): self.price = self.price + random.uniform(-0.05, 0.05) volume = random.randrange(100,10000,100) - event = zp.namedict({ + event = zp.ndict({ "type" : zp.DATASOURCE_TYPE.TRADE, "sid" : self.sid, "price" : self.price, @@ -70,7 +71,6 @@ class RandomEquityTrades(TradeDataSource): self.incr += 1 - class SpecificEquityTrades(TradeDataSource): """ Generates a random stream of trades for testing. @@ -88,7 +88,7 @@ class SpecificEquityTrades(TradeDataSource): 'volume' : integer for volume } """ - zm.DataSource.__init__(self, source_id) + DataSource.__init__(self, source_id) self.event_list = event_list self.count = 0 @@ -113,7 +113,5 @@ class SpecificEquityTrades(TradeDataSource): return event = self.event_list.pop(0) - self.send(zp.namedict(event)) + self.send(zp.ndict(event)) self.count +=1 - - diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 1910aff5..0efafdc4 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -1,3 +1,4 @@ +import logging import datetime import pytz import math @@ -6,14 +7,12 @@ import time from collections import Counter # from gevent.select import select -from zmq.core.poll import select -import zipline.messaging as qmsg -import zipline.util as qutil +from zipline.core import Component import zipline.protocol as zp import zipline.finance.performance as perf -from zipline.protocol_utils import Enum, ndict +from zipline.utils.protocol_utils import Enum, ndict # the simulation style enumerates the available transaction simulation # strategies. @@ -24,10 +23,12 @@ SIMULATION_STYLE = Enum( 'NOOP' ) -class TradeSimulationClient(qmsg.Component): +LOGGER = logging.getLogger('ZiplineLogger') + +class TradeSimulationClient(Component): def __init__(self, trading_environment, sim_style): - qmsg.Component.__init__(self) + Component.__init__(self) self.received_count = 0 self.prev_dt = None self.event_queue = None @@ -41,8 +42,7 @@ class TradeSimulationClient(qmsg.Component): self.last_msg_dt = datetime.datetime.utcnow() self.txn_sim = TransactionSimulator(sim_style) - assert self.trading_environment.frame_index != None - self.event_frame = ndict() + self.event_data = ndict() self.perf = perf.PerformanceTracker(self.trading_environment) @property @@ -90,7 +90,7 @@ class TradeSimulationClient(qmsg.Component): self.finish_simulation() def finish_simulation(self): - qutil.LOGGER.info("Client is DONE!") + LOGGER.info("Client is DONE!") # signal the performance tracker that the simulation has # ended. Perf will internally calculate the full risk report. self.perf.handle_simulation_end() @@ -147,19 +147,19 @@ class TradeSimulationClient(qmsg.Component): As per the algorithm protocol: - Set the current portfolio for the algorithm as per protocol. - - Construct frame based on backlog of events, send to algorithm. + - Construct data based on backlog of events, send to algorithm. """ current_portfolio = self.perf.get_portfolio() self.algorithm.set_portfolio(current_portfolio) - frame = self.get_frame() - if len(frame) > 0: - self.algorithm.handle_frame(frame) + data = self.get_data() + if len(data) > 0: + self.algorithm.handle_data(data) def connect_order(self): return self.connect_push_socket(self.addresses['order_address']) def order(self, sid, amount): - order = zp.namedict({ + order = zp.ndict({ 'dt':self.current_dt, 'sid':sid, 'amount':amount @@ -176,11 +176,11 @@ class TradeSimulationClient(qmsg.Component): self.event_queue = [] self.event_queue.append(event) - def get_frame(self): + def get_data(self): for event in self.event_queue: - self.event_frame[event['sid']] = event + self.event_data[event['sid']] = event self.event_queue = [] - return self.event_frame + return self.event_data class TransactionSimulator(object): @@ -214,7 +214,7 @@ class TransactionSimulator(object): log = "requested to trade zero shares of {sid}".format( sid=event.sid ) - qutil.LOGGER.debug(log) + LOGGER.debug(log) return if(not self.open_orders.has_key(event.sid)): @@ -338,7 +338,7 @@ for orders: event=str(event), orders=str(orders) ) - qutil.LOGGER.warn(warning) + LOGGER.warn(warning) return None @@ -351,7 +351,7 @@ for orders: 'commission' : self.commission * amount * direction, 'source_id' : zp.FINANCE_COMPONENT.TRANSACTION_SIM } - return zp.namedict(txn) + return zp.ndict(txn) class TradingEnvironment(object): @@ -370,7 +370,6 @@ class TradingEnvironment(object): self.trading_day_map = {} self.treasury_curves = treasury_curves self.benchmark_returns = benchmark_returns - self.frame_index = ['sid', 'volume', 'dt', 'price', 'changed'] self.period_start = period_start self.period_end = period_end self.capital_base = capital_base @@ -471,14 +470,6 @@ class TradingEnvironment(object): return self.trading_day_map[date].returns else: return 0.0 - - def add_to_frame(self, name): - """ - Add an entry to the frame index. - :param name: new index entry name. Used by TradingSimulationClient - to - """ - self.frame_index.append(name) diff --git a/zipline/finance/vwap.py b/zipline/finance/vwap.py index 9ef07299..8e404aa4 100644 --- a/zipline/finance/vwap.py +++ b/zipline/finance/vwap.py @@ -1,8 +1,6 @@ -import pandas -from datetime import timedelta from collections import defaultdict -from zipline.messaging import BaseTransform +from zipline.transforms.base import BaseTransform from zipline.finance.movingaverage import EventWindow class VWAPTransform(BaseTransform): diff --git a/zipline/lines.py b/zipline/lines.py index 932d9e2f..7bfea1c5 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -1,20 +1,20 @@ """ -Ziplines are composed of multiple components connected by asynchronous -messaging. All ziplines follow a general topology of parallel sources, -datetimestamp serialization, parallel transformations, and finally sinks. -Furthermore, many ziplines have common needs. For example, all trade -simulations require a +Ziplines are composed of multiple components connected by asynchronous +messaging. All ziplines follow a general topology of parallel sources, +datetimestamp serialization, parallel transformations, and finally sinks. +Furthermore, many ziplines have common needs. For example, all trade +simulations require a :py:class:`~zipline.finance.trading.TradeSimulationClient`. -To establish best practices and minimize code replication, the lines module +To establish best practices and minimize code replication, the lines module provides complete zipline topologies. You can extend any zipline without the need to extend the class. Simply instantiate any additional components -that you would like included in the zipline, and add them to the zipline -before invoking simulate. +that you would like included in the zipline, and add them to the zipline +before invoking simulate. + - Here is a diagram of the SimulatedTrading zipline: - + +----------------------+ +------------------------+ | Trade History | | (DataSource added | @@ -60,62 +60,54 @@ before invoking simulate. +---------------------------------+ """ -import mock -import pytz +import logging -from datetime import datetime, timedelta -from collections import defaultdict +import zipline.utils.factory as factory -from nose.tools import timed +from zipline.components import DataSource +from zipline.transforms import BaseTransform -import zipline.test.factory as factory -import zipline.util as qutil -import zipline.finance.risk as risk -import zipline.protocol as zp -import zipline.finance.performance as perf -import zipline.messaging as zmsg - -from zipline.test.algorithms import TestAlgorithm -from zipline.sources import SpecificEquityTrades +from zipline.test_algorithms import TestAlgorithm from zipline.finance.trading import TradeSimulationClient -from zipline.simulator import AddressAllocator, Simulator -from zipline.monitor import Controller +from zipline.core.devsimulator import Simulator +from zipline.core.monitor import Controller from zipline.finance.trading import SIMULATION_STYLE +LOGGER = logging.getLogger('ZiplineLogger') class SimulatedTrading(object): """ Zipline with:: - + - _no_ data sources. - Trade simulation client, which is available to send callbacks on events and also accept orders to be simulated. - An order data source, which will receive orders from the trade - simulation client, and feed them into the event stream to be + simulation client, and feed them into the event stream to be serialized and order alongside all other data source events. - transaction simulation transformation, which receives the order events and estimates a theoretical execution price and volume. - + All components in this zipline are subject to heartbeat checks and a control monitor, which can kill the entire zipline in the event of exceptions in one of the components or an external request to end the simulation. """ - + def __init__(self, **config): """ :param config: a dict with the following required properties:: - + - algorithm: a class that follows the algorithm protocol. See - :py:meth:`zipline.finance.trading.TradingSimulationClient.add_algorithm` + :py:meth:`zipline.finance.trading.TradingSimulationClient.add_algorithm for details. - trading_environment: an instance of :py:class:`zipline.trading.TradingEnvironment` - - allocator: an instance of + - allocator: an instance of :py:class:`zipline.simulator.AddressAllocator` - - simulator_class: a :py:class:`zipline.messaging.ComponentHost` + - simulator_class: a :py:class:`zipline.core.host.ComponentHost` subclass (not an instance) - - simulation_style: optional parameter that configures the + - simulation_style: optional parameter that configures the :py:class:`zipline.finance.trading.TransactionSimulator`. Expects a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading` """ @@ -124,10 +116,10 @@ class SimulatedTrading(object): self.allocator = config['allocator'] self.trading_environment = config['trading_environment'] self.sim_style = config.get('simulation_style') - + self.leased_sockets = [] self.sim_context = None - + sockets = self.allocate_sockets(8) addresses = { 'sync_address' : sockets[0], @@ -141,75 +133,75 @@ class SimulatedTrading(object): self.con = Controller( sockets[6], sockets[7], - logging = qutil.LOGGER + logger = LOGGER ) - + self.con.cancel_socket = self.allocator.lease(1)[0] # TODO: Not freeform self.con.manage( 'freeform' ) - + self.started = False - + self.sim = config['simulator_class'](addresses) - + self.clients = {} self.trading_client = TradeSimulationClient( self.trading_environment, self.sim_style ) self.add_client(self.trading_client) - + # setup all sources self.sources = {} #self.order_source = OrderDataSource() #self.add_source(self.order_source) - + #setup transforms #self.transaction_sim = TransactionSimulator(self.sim_style) self.transforms = {} #self.add_transform(self.transaction_sim) - + self.sim.register_controller( self.con ) self.sim.on_done = self.shutdown() - - + + self.trading_client.set_algorithm(self.algorithm) - + @staticmethod def create_test_zipline(**config): """ :param config: A configuration object that is a dict with: - + - environment - a \ :py:class:`zipline.finance.trading.TradingEnvironment` - allocator - a :py:class:`zipline.simulator.AddressAllocator` - - sid - an integer, which will be used as the security ID. + - sid - an integer, which will be used as the security ID. - order_count - the number of orders the test algo will place, defaults to 100 - order_amount - the number of shares per order, defaults to 100 - trade_count - the number of trades to simulate, defaults to 101 to ensure all orders are processed. - - simulator_class - optional parameter that provides an alternative + - simulator_class - optional parameter that provides an alternative subclass of ComponentHost to hold the whole zipline. Defaults to - :py:class:`zipline.simulator.Simulator` + :py:class:`zipline.simulator.Simulator` - algorithm - optional parameter providing an algorithm. defaults to :py:class:`zipline.test.algorithms.TestAlgorithm` - trade_source - optional parameter to specify trades, if present. - If not present :py:class:`ziplien.sources.SpecificEquityTrades` + If not present :py:class:`ziplien.sources.SpecificEquityTrades` is the source, with daily frequency in trades. - - simulation_style: optional parameter that configures the + - simulation_style: optional parameter that configures the :py:class:`zipline.finance.trading.TransactionSimulator`. Expects a SIMULATION_STYLE as defined in :py:mod:`zipline.finance.trading` """ assert isinstance(config, dict) - + allocator = config['allocator'] sid = config['sid'] - + #-------------------- # Trading Environment #-------------------- @@ -217,33 +209,33 @@ class SimulatedTrading(object): trading_environment = config['environment'] else: trading_environment = factory.create_trading_environment() - + if config.has_key('order_count'): order_count = config['order_count'] else: order_count = 100 - + if config.has_key('order_amount'): order_amount = config['order_amount'] else: order_amount = 100 - + if config.has_key('trade_count'): trade_count = config['trade_count'] else: # to ensure all orders are filled, we provide one more # trade than order trade_count = 101 - + if config.has_key('simulator_class'): simulator_class = config['simulator_class'] else: simulator_class = Simulator - + simulation_style = config.get('simulation_style') if not simulation_style: simulation_style = SIMULATION_STYLE.FIXED_SLIPPAGE - + #------------------- # Trade Source #------------------- @@ -283,41 +275,41 @@ class SimulatedTrading(object): zipline.add_source(trade_source) return zipline - + def add_source(self, source): """ Adds the source to the zipline, sets the sid filter of the source to the algorithm's sid filter. """ - assert isinstance(source, zmsg.DataSource) - self.check_started() + assert isinstance(source, DataSource) + self.check_started() source.set_filter('SID', self.algorithm.get_sid_filter()) self.sim.register_components([source]) self.sources[source.get_id] = source - + def add_transform(self, transform): - assert isinstance(transform, zmsg.BaseTransform) + assert isinstance(transform, BaseTransform) self.check_started() self.sim.register_components([transform]) self.transforms[transform.get_id] = transform - + def add_client(self, client): assert isinstance(client, TradeSimulationClient) self.check_started() self.sim.register_components([client]) self.clients[client.get_id] = client - + def check_started(self): if self.started: raise ZiplineException("TradeSimulation", "You cannot add \ components after the simulation has begun.") - + def get_cumulative_performance(self): return self.trading_client.perf.cumulative_performance.to_dict() - + def publish_to(self, result_socket): self.trading_client.perf.publish_to(result_socket) - + def allocate_sockets(self, n): """ Allocate sockets local to this line, track them so @@ -331,7 +323,7 @@ class SimulatedTrading(object): self.leased_sockets.extend(leased) return leased - + def simulate(self, blocking=False): self.started = True self.sim_context = self.sim.simulate() @@ -341,11 +333,11 @@ class SimulatedTrading(object): def shutdown(self): pass #self.allocator.reaquire(*self.leased_sockets) - + #-------------------------------- # Component property accessors #-------------------------------- - + def get_positions(self): """ returns current positions as a dict. draws from the cumulative @@ -354,14 +346,14 @@ class SimulatedTrading(object): perf = self.trading_client.perf.cumulative_performance positions = perf.get_positions() return positions - + class ZiplineException(Exception): def __init__(self, zipline_name, msg): self.name = zipline_name self.message = msg - + def __str__(self): return "Unexpected exception {line}: {msg}".format( - line=self.name, + line=self.name, msg=self.message ) diff --git a/zipline/messaging.py b/zipline/messaging.py deleted file mode 100644 index e1011071..00000000 --- a/zipline/messaging.py +++ /dev/null @@ -1,636 +0,0 @@ -""" -Commonly used messaging components. -""" - -import datetime - -from collections import Counter - -import zipline.util as qutil -from zipline.component import Component -import zipline.protocol as zp -from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \ - COMPONENT_STATE, CONTROL_FRAME, CONTROL_UNFRAME - -class ComponentHost(Component): - """ - Components that can launch multiple sub-components, synchronize their - start, and then wait for all components to be finished. - """ - - def __init__(self, addresses): - Component.__init__(self) - self.addresses = addresses - self.running = False - - self.init() - - def init(self): - assert hasattr(self, 'zmq_flavor'), \ - """ You must specify a flavor of ZeroMQ for all - ComponentHost subclasses. """ - - # Component Registry, keyed by get_id - # ---------------------- - self.components = {} - # ---------------------- - # Internal Registry, keyed by guid - self._components = {} - # ---------------------- - - self.sync_register = {} - self.timeout = datetime.timedelta(seconds=60) - - self.feed = Feed() - self.merge = Merge() - self.passthrough = PassthroughTransform() - self.controller = None - - #register the feed and the merge - self.register_components([self.feed, self.merge, self.passthrough]) - - def register_controller(self, controller): - """ - Add the given components to the registry. Establish - communication with them. - """ - if self.controller != None: - raise Exception("There can be only one!") - - self.controller = controller - self.controller.zmq_flavor = self.zmq_flavor - - # Propogate the controller to all the subcomponents - for component in self.components.itervalues(): - component.controller = controller - - def register_components(self, components): - """ - Add the given components to the registry. Establish - communication with them. - """ - assert isinstance(components, list) - for component in components: - - component.addresses = self.addresses - component.controller = self.controller - - # Hosts share their zmq flavor with hosted components - component.zmq_flavor = self.zmq_flavor - - self._components[component.guid] = component - self.components[component.get_id] = component - self.sync_register[component.get_id] = datetime.datetime.utcnow() - - if isinstance(component, DataSource): - self.feed.add_source(component.get_id) - if isinstance(component, BaseTransform): - self.merge.add_source(component.get_id) - - def unregister_component(self, component_id): - del self.components[component_id] - del self.sync_register[component_id] - - def setup_sync(self): - """ - Setup the sync socket and poller. ( Bind ) - """ - qutil.LOGGER.debug("Connecting sync server.") - - self.sync_socket = self.context.socket(self.zmq.REP) - self.sync_socket.bind(self.addresses['sync_address']) - - self.sync_poller = self.zmq_poller() - self.sync_poller.register(self.sync_socket, self.zmq.POLLIN) - - self.sockets.append(self.sync_socket) - - def open(self): - for component in self.components.values(): - self.launch_component(component) - self.launch_controller() - - def is_running(self): - """ - DEPRECATED, left in for compatability for now. - """ - - cur_time = datetime.datetime.utcnow() - - if len(self.components) == 0: - qutil.LOGGER.info("Component register is empty.") - return False - - return True - - def loop(self, lockstep=True): - - while self.is_running(): - # wait for synchronization request at start, and DONE at end. - # don't timeout. - socks = dict(self.sync_poller.poll()) - - if self.sync_socket in socks and socks[self.sync_socket] == self.zmq.POLLIN: - msg = self.sync_socket.recv() - - try: - parts = msg.split(':') - sync_id, status = parts - except ValueError as exc: - self.signal_exception(exc) - - if status == str(CONTROL_PROTOCOL.DONE): # TODO: other way around - #qutil.LOGGER.debug("{id} is DONE".format(id=sync_id)) - self.unregister_component(sync_id) - self.state_flag = COMPONENT_STATE.DONE - else: - self.sync_register[sync_id] = datetime.datetime.utcnow() - - #qutil.LOGGER.info("confirmed {id}".format(id=msg)) - # send synchronization reply - self.sync_socket.send('ack', self.zmq.NOBLOCK) - - # ------------------ - # Simulation Control - # ------------------ - - def launch_controller(self, controller): - raise NotImplementedError - - def launch_component(self, component): - raise NotImplementedError - - def teardown_component(self, component): - raise NotImplementedError - - -class Feed(Component): - """ - Connects to N PULL sockets, publishing all messages received to a PUB - socket. Published messages are guaranteed to be in chronological order - based on message property dt. Expects to be instantiated in one execution - context (thread, process, etc) and run in another. - """ - - def __init__(self): - Component.__init__(self) - - self.sent_count = 0 - self.received_count = 0 - self.draining = False - self.ds_finished_counter = 0 - - # Depending on the size of this, might want to use a data - # structure with better asymptotics. - self.data_buffer = {} - - # source_id -> integer count - self.sent_counters = Counter() - self.recv_counters = Counter() - - def init(self): - pass - - @property - def get_id(self): - return "FEED" - - @property - def get_type(self): - return COMPONENT_TYPE.CONDUIT - - # ------------- - # Core Methods - # ------------- - - def open(self): - self.pull_socket = self.bind_data() - self.feed_socket = self.bind_feed() - - def do_work(self): - # wait for synchronization reply from the host - socks = dict(self.poll.poll(self.heartbeat_timeout)) - - # TODO: Abstract this out, maybe on base component - if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN: - msg = self.control_in.recv() - event, payload = CONTROL_UNFRAME(msg) - - # -- Heartbeat -- - if event == CONTROL_PROTOCOL.HEARTBEAT: - # Heart outgoing - heartbeat_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.OK, - payload - ) - self.control_out.send(heartbeat_frame) - - # -- Soft Kill -- - elif event == CONTROL_PROTOCOL.SHUTDOWN: - self.signal_done() - self.shutdown() - - # -- Hard Kill -- - elif event == CONTROL_PROTOCOL.KILL: - self.kill() - - - if self.pull_socket in socks and socks[self.pull_socket] == self.zmq.POLLIN: - message = self.pull_socket.recv() - - if message == str(CONTROL_PROTOCOL.DONE): - self.ds_finished_counter += 1 - - if len(self.data_buffer) == self.ds_finished_counter: - #drain any remaining messages in the buffer - qutil.LOGGER.debug("draining feed") - self.drain() - self.signal_done() - else: - try: - event = self.unframe(message) - # deserialization error - except zp.INVALID_DATASOURCE_FRAME as exc: - return self.signal_exception(exc) - - try: - self.append(event) - self.send_next() - - # Invalid message - except zp.INVALID_DATASOURCE_FRAME as exc: - return self.signal_exception(exc) - - def unframe(self, msg): - return zp.DATASOURCE_UNFRAME(msg) - - def frame(self, event): - return zp.FEED_FRAME(event) - - # ------------- - # Flow Control - # ------------- - - def drain(self): - """ - Send all messages in the buffer. - """ - self.draining = True - while self.pending_messages() > 0: - self.send_next() - - def send_next(self): - """ - Send the (chronologically) next message in the buffer. - """ - if not (self.is_full() or self.draining): - return - - event = self.next() - if(event != None): - self.feed_socket.send(self.frame(event), self.zmq.NOBLOCK) - self.sent_counters[event.source_id] += 1 - self.sent_count += 1 - - def append(self, event): - """ - Add an event to the buffer for the source specified by - source_id. - """ - self.data_buffer[event.source_id].append(event) - self.recv_counters[event.source_id] += 1 - self.received_count += 1 - - def next(self): - """ - Get the next message in chronological order. - """ - if not(self.is_full() or self.draining): - return - - cur_source = None - earliest_source = None - earliest_event = None - #iterate over the queues of events from all sources - #(1 queue per datasource) - for events in self.data_buffer.values(): - if len(events) == 0: - continue - cur_source = events - first_in_list = events[0] - if first_in_list.dt == None: - #this is a filler event, discard - events.pop(0) - continue - - if (earliest_event == None) or (first_in_list.dt <= earliest_event.dt): - earliest_event = first_in_list - earliest_source = cur_source - - if earliest_event != None: - return earliest_source.pop(0) - - def is_full(self): - """ - Indicates whether the buffer has messages in buffer for - all un-DONE, blocking sources. - """ - for source_id, events in self.data_buffer.iteritems(): - if len(events) == 0: - return False - return True - - def pending_messages(self): - """ - Returns the count of all events from all sources in the - buffer. - """ - total = 0 - for events in self.data_buffer.values(): - total += len(events) - return total - - def add_source(self, source_id): - """ - Add a data source to the buffer. - """ - self.data_buffer[source_id] = [] - - def __len__(self): - """ - Buffer's length is same as internal map holding separate - sorted arrays of events keyed by source id. - """ - return len(self.data_buffer) - - -class Merge(Feed): - """ - Merges multiple streams of events into single messages. - """ - - def __init__(self): - Feed.__init__(self) - - self.init() - - def init(self): - pass - - @property - def get_id(self): - return "MERGE" - - @property - def get_type(self): - return COMPONENT_TYPE.CONDUIT - - def open(self): - self.pull_socket = self.bind_merge() - self.feed_socket = self.bind_result() - - def next(self): - """Get the next merged message from the feed buffer.""" - if not (self.is_full() or self.draining): - return - - if self.pending_messages() == 0: - return - - # - #get the raw event from the passthrough transform. - result = self.data_buffer[zp.TRANSFORM_TYPE.PASSTHROUGH].pop(0).PASSTHROUGH - for source, events in self.data_buffer.iteritems(): - if source == zp.TRANSFORM_TYPE.PASSTHROUGH: - continue - if len(events) > 0: - cur = events.pop(0) - result.merge(cur) - return result - - def unframe(self, msg): - return zp.TRANSFORM_UNFRAME(msg) - - def frame(self, event): - return zp.MERGE_FRAME(event) - - def append(self, event): - """ - :param event: a namedict with one entry. key is the name of the - transform, value is the transformed value. - Add an event to the buffer for the source specified by - source_id. - """ - - self.data_buffer[event.keys()[0]].append(event) - self.received_count += 1 - - -class BaseTransform(Component): - """ - Top level execution entry point for the transform - - - connects to the feed socket to subscribe to events - - connects to the result socket (most oftened bound by a TransformsMerge) to PUSH transforms - - processes all messages received from feed, until DONE message received - - pushes all transforms - - sends DONE to result socket, closes all sockets and context - - Parent class for feed transforms. Subclass and override transform - method to create a new derived value from the combined feed. - """ - - def __init__(self, name, **kwargs): - Component.__init__(self) - - self.state = { - 'name': name - } - - self.init(**kwargs) - - def init(self): - pass - - @property - def get_id(self): - return self.state['name'] - - @property - def get_type(self): - return COMPONENT_TYPE.CONDUIT - - def open(self): - """ - Establishes zmq connections. - """ - #create the feed. - self.feed_socket = self.connect_feed() - #create the result PUSH - self.result_socket = self.connect_merge() - - def do_work(self): - """ - Loops until feed's DONE message is received: - - - receive an event from the data feed - - call transform (subclass' method) on event - - send the transformed event - - """ - socks = dict(self.poll.poll(self.heartbeat_timeout)) - - # TODO: Abstract this out, maybe on base component - if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN: - msg = self.control_in.recv() - event, payload = CONTROL_UNFRAME(msg) - - # -- Heartbeat -- - if event == CONTROL_PROTOCOL.HEARTBEAT: - # Heart outgoing - heartbeat_frame = CONTROL_FRAME( - CONTROL_PROTOCOL.OK, - payload - ) - self.control_out.send(heartbeat_frame) - - # -- Soft Kill -- - elif event == CONTROL_PROTOCOL.SHUTDOWN: - self.signal_done() - self.shutdown() - - # -- Hard Kill -- - elif event == CONTROL_PROTOCOL.KILL: - self.kill() - - if self.feed_socket in socks and socks[self.feed_socket] == self.zmq.POLLIN: - message = self.feed_socket.recv() - - if message == str(CONTROL_PROTOCOL.DONE): - self.signal_done() - return - - try: - event = self.unframe(message) - except zp.INVALID_FEED_FRAME as exc: - return self.signal_exception(exc) - - try: - cur_state = self.transform(event) - - # This is overloaded, so it can fail in all sorts of - # unknown ways. Its best to catch it in the - # Transformer itself. - except Exception as exc: - return self.signal_exception(exc) - - try: - transform_frame = self.frame(cur_state) - except zp.INVALID_TRANSFORM_FRAME as exc: - return self.signal_exception(exc) - - self.result_socket.send(transform_frame, self.zmq.NOBLOCK) - - def frame(self, cur_state): - return zp.TRANSFORM_FRAME(cur_state['name'], cur_state['value']) - - def unframe(self, msg): - return zp.FEED_UNFRAME(msg) - - def transform(self, event): - """ - Must return the transformed value as a map with:: - - {name:"name of new transform", value: "value of new field"} - - Transforms run in parallel and results are merged into a single map, so - transform names must be unique. Best practice is to use the self.state - object initialized from the transform configuration, and only set the - transformed value:: - - self.state['value'] = transformed_value - """ - raise NotImplementedError - - -class PassthroughTransform(BaseTransform): - """ - A bypass transform which is also an identity transform:: - - +-------+ - +---| f |---> - +-------+ - +------id-------> - - """ - - def __init__(self, **kwargs): - BaseTransform.__init__(self, "PASSTHROUGH") - self.init(**kwargs) - - def init(self, **kwargs): - pass - - @property - def get_type(self): - return COMPONENT_TYPE.CONDUIT - - #TODO, could save some cycles by skipping the _UNFRAME call and just setting value to original msg string. - def transform(self, event): - return {'name':zp.TRANSFORM_TYPE.PASSTHROUGH, 'value': zp.FEED_FRAME(event) } - - -class DataSource(Component): - """ - Baseclass for data sources. Subclass and implement send_all - usually this - means looping through all records in a store, converting to a dict, and - calling send(map). - - Every datasource has a dict property to hold filters:: - - key -- name of the filter, e.g. SID - - value -- a primitive representing the filter. e.g. a list of ints. - - Modify the datasource's filters via the set_filter(name, value) - """ - def __init__(self, source_id): - Component.__init__(self) - - self.id = source_id - self.init() - self.filter = {} - - def init(self): - self.cur_event = None - - def set_filter(self, name, value): - self.filter[name] = value - - @property - def get_id(self): - return self.id - - @property - def get_type(self): - return COMPONENT_TYPE.SOURCE - - def open(self): - self.data_socket = self.connect_data() - - def send(self, event): - """ - Emit data. - """ - assert isinstance(event, zp.namedict) - - event['source_id'] = self.get_id - event['type'] = self.get_type - - try: - ds_frame = self.frame(event) - except zp.INVALID_DATASOURCE_FRAME as exc: - return self.signal_exception(exc) - - self.data_socket.send(ds_frame) - - def frame(self, event): - return zp.DATASOURCE_FRAME(event) diff --git a/zipline/optimize/__init__.py b/zipline/optimize/__init__.py index e69de29b..cb12f8fe 100644 --- a/zipline/optimize/__init__.py +++ b/zipline/optimize/__init__.py @@ -0,0 +1,3 @@ +""" +Thomas's parameter optimization library. +""" diff --git a/sloccount.sc b/zipline/profile/__init__.py similarity index 100% rename from sloccount.sc rename to zipline/profile/__init__.py diff --git a/zipline/profile/prof.py b/zipline/profile/prof.py new file mode 100644 index 00000000..d292b300 --- /dev/null +++ b/zipline/profile/prof.py @@ -0,0 +1,104 @@ +""" + +Viscosity - Tools for benchmarking ZeroMQ data flow. + +""" + +import time as timer +import logging +import pycounters +from contextlib import contextmanager, nested +from pycounters import base +from pycounters.shortcuts import frequency, time +from pycounters import shortcuts, reporters, start_auto_reporting, register_reporter +from pycounters import shortcuts,reporters,report_value, output_report, \ +counters, register_counter, _reporting_decorator_context_manager + +JSONFile = "counters.json" + +logger = logging.getLogger('simple_example') +logger.setLevel(logging.DEBUG) + +ch = logging.StreamHandler() +ch.setLevel(logging.DEBUG) +logger.addHandler(ch) + +reporter = reporters.JSONFileReporter(output_file=JSONFile) +logreport = reporters.LogReporter(logger) +register_reporter(logreport) +register_reporter(reporter) + +class timecontext: + + def __init__(self, name): + self.name = name + + def __enter__(self): + cntr = base.GLOBAL_REGISTRY.get_counter(self.name, throw=False) + if not cntr: + counter = counters.AverageTimeCounter(self.name) + register_counter(counter) + self.tic = timer.time() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if not exc_type: + shortcuts.value(self.name, timer.time() - self.tic) + +class ttimecontext: + + def __init__(self, name): + self.name = name + + def __enter__(self): + counter = base.GLOBAL_REGISTRY.get_counter(self.name, throw=False) + + if not counter: + counter = counters.EventCounter(self.name) + counter.value = 0 + register_counter(counter) + + self.counter = counter + self.tic = timer.time() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if not exc_type: + val = (timer.time() - self.tic) + if not self.counter.value: + self.counter.value = long(0.0) + self.counter.value += val + +class occurancecontext: + + def __init__(self, name): + self.name = name + + def __enter__(self): + cntr = base.GLOBAL_REGISTRY.get_counter(self.name, throw=False) + if not cntr: + cntr = counters.TotalCounter(self.name) + counter = counters.TotalCounter(self.name) + register_counter(counter) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + shortcuts.value(self.name, 1) + +if __name__ == '__main__': + + with timecontext('average time'): + for i in xrange(5): + x = [2] * 1000 + timer.sleep(0.01) + + with occurancecontext('totalcount'): + for i in xrange(5): + x = [2] * 1000 + + with ttimecontext('total time'): + for i in xrange(5): + x = [2] * 1000 + timer.sleep(1) + + pycounters.output_report() diff --git a/zipline/protocol.py b/zipline/protocol.py index 90a3184a..374f2fda 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -65,7 +65,7 @@ Namedict Namedicts are dict like objects that have fields accessible by attribute lookup as well as being indexable and iterable:: - HEARTBEAT_PROTOCOL = namedict({ + HEARTBEAT_PROTOCOL = ndict({ 'REQ' : b'\x01', 'REP' : b'\x02', }) @@ -118,13 +118,10 @@ import msgpack import numbers import datetime import pytz -import numpy -import time -import copy from collections import namedtuple -from protocol_utils import Enum, FrameExceptionFactory, namedict -from date_utils import EPOCH, UN_EPOCH +from utils.protocol_utils import Enum, FrameExceptionFactory, ndict +from utils.date_utils import EPOCH, UN_EPOCH # ----------------------- # Control Protocol @@ -221,7 +218,7 @@ def DATASOURCE_FRAME(event): Wraps any datasource payload with id and type, so that unpacking may choose the write UNFRAME for the payload. - :param event: namedict with following properties + :param event: ndict with following properties - *ds_id* an identifier that is unique to the datasource in the context of a component host (e.g. Simulator) - *ds_type* a string denoting the datasource type. Must be on of: @@ -260,42 +257,43 @@ def DATASOURCE_FRAME(event): def DATASOURCE_UNFRAME(msg): """ - - Extracts payload, and calls correct UNFRAME method based on the \ -datasource type passed along. - + + Extracts payload, and calls correct UNFRAME method based on the + datasource type passed along. + Returns a dict containing at least: - + - source_id - type other properties are added based on the datasource type: - + - TRADE - + - sid - int security identifier - price - float - volume - int - dt - a datetime object - - """ + """ try: ds_type, source_id, payload = msgpack.loads(msg) assert isinstance(ds_type, int) - rval = namedict({'source_id':source_id}) + + rval = ndict({'source_id':source_id}) + if payload == DATASOURCE_TYPE.EMPTY: - child_value = namedict({'dt':None}) + child_value = ndict({'dt':None}) elif(ds_type == DATASOURCE_TYPE.TRADE): child_value = TRADE_UNFRAME(payload) elif(ds_type == DATASOURCE_TYPE.ORDER): child_value = ORDER_SOURCE_UNFRAME(payload) else: raise INVALID_DATASOURCE_FRAME(msg) - + rval.merge(child_value) return rval - + except TypeError: raise INVALID_DATASOURCE_FRAME(msg) except ValueError: @@ -309,12 +307,12 @@ INVALID_FEED_FRAME = FrameExceptionFactory('FEED') def FEED_FRAME(event): """ - :param event: a nameddict with at least - + :param event: a ndict with at least + - source_id - type """ - assert isinstance(event, namedict) + assert isinstance(event, ndict) source_id = event.source_id ds_type = event.type PACK_DATE(event) @@ -326,7 +324,7 @@ def FEED_UNFRAME(msg): payload = msgpack.loads(msg) #TODO: anything we can do to assert more about the content of the dict? assert isinstance(payload, dict) - rval = namedict(payload) + rval = ndict(payload) UNPACK_DATE(rval) return rval except TypeError: @@ -350,13 +348,13 @@ def TRANSFORM_FRAME(name, value): def TRANSFORM_UNFRAME(msg): """ - :rtype: namedict with : + :rtype: ndict with : """ try: name, value = msgpack.loads(msg) if(value == TRANSFORM_TYPE.EMPTY): - return namedict({name : None}) + return ndict({name : None}) #TODO: anything we can do to assert more about the content of the dict? assert isinstance(name, basestring) if(name == TRANSFORM_TYPE.PASSTHROUGH): @@ -364,7 +362,7 @@ def TRANSFORM_UNFRAME(msg): elif(name == TRANSFORM_TYPE.TRANSACTION): value = TRANSACTION_UNFRAME(value) - return namedict({name : value}) + return ndict({name : value}) except TypeError: raise INVALID_TRANSFORM_FRAME(msg) except ValueError: @@ -382,7 +380,7 @@ def MERGE_FRAME(event): - source_id - type """ - assert isinstance(event, namedict) + assert isinstance(event, ndict) PACK_DATE(event) if(event.has_attr(TRANSFORM_TYPE.TRANSACTION)): if(event.TRANSACTION == None): @@ -397,7 +395,7 @@ def MERGE_UNFRAME(msg): payload = msgpack.loads(msg) #TODO: anything we can do to assert more about the content of the dict? assert isinstance(payload, dict) - payload = namedict(payload) + payload = ndict(payload) if(payload.has_attr(TRANSFORM_TYPE.TRANSACTION)): if(payload.TRANSACTION == TRANSFORM_TYPE.EMPTY): payload.TRANSACTION = None @@ -425,7 +423,7 @@ INVALID_TRADE_FRAME = FrameExceptionFactory('TRADE') def TRADE_FRAME(event): """ - :param event: should be a namedict with: + :param event: should be a ndict with: - ds_id -- the datasource id sending this trade out - sid -- the security id @@ -434,7 +432,7 @@ def TRADE_FRAME(event): - dt -- datetime for the trade """ - assert isinstance(event, namedict) + assert isinstance(event, ndict) assert event.type == DATASOURCE_TYPE.TRADE assert isinstance(event.sid, int) assert isinstance(event.price, numbers.Real) @@ -456,7 +454,7 @@ def TRADE_UNFRAME(msg): assert isinstance(sid, int) assert isinstance(price, numbers.Real) assert isinstance(volume, numbers.Integral) - rval = namedict({ + rval = ndict({ 'sid' : sid, 'price' : price, 'volume' : volume, @@ -491,7 +489,7 @@ def ORDER_UNFRAME(msg): sid, amount, dt = msgpack.loads(msg) assert isinstance(sid, int) assert isinstance(amount, int) - rval = namedict({ + rval = ndict({ 'sid':sid, 'amount':amount, 'dt':dt @@ -513,7 +511,7 @@ def ORDER_UNFRAME(msg): def TRANSACTION_FRAME(event): - assert isinstance(event, namedict) + assert isinstance(event, ndict) assert isinstance(event.sid, int) assert isinstance(event.price, numbers.Real) assert isinstance(event.commission, numbers.Real) @@ -535,7 +533,7 @@ def TRANSACTION_UNFRAME(msg): assert isinstance(price, numbers.Real) assert isinstance(commission, numbers.Real) assert isinstance(amount, int) - rval = namedict({ + rval = ndict({ 'sid' : sid, 'price' : price, 'amount' : amount, @@ -577,7 +575,7 @@ def ORDER_SOURCE_FRAME(event): def ORDER_SOURCE_UNFRAME(msg): try: sid, amount, dt, source_id, source_type = msgpack.loads(msg) - event = namedict({ + event = ndict({ "sid" : sid, "amount" : amount, "dt" : dt, @@ -688,7 +686,7 @@ def PACK_DATE(event): PACK_DATE and UNPACK_DATE are inverse operations. - :param event: event must a namedict with a property named 'dt' that is a datetime. + :param event: event must a ndict with a property named 'dt' that is a datetime. :rtype: None """ assert isinstance(event.dt, datetime.datetime) @@ -710,7 +708,7 @@ def UNPACK_DATE(event): UNPACK_DATE and PACK_DATE are inverse operations. - :param tuple event: event must a namedict with: + :param tuple event: event must a ndict with: - a property named 'dt_tuple' that is a tuple of integers \ representing the date and time in UTC. @@ -742,15 +740,15 @@ ORDER_PROTOCOL = Enum( ) -#Transform type needs to be a namedict to facilitate merging. -TRANSFORM_TYPE = namedict({ +#Transform type needs to be a ndict to facilitate merging. +TRANSFORM_TYPE = ndict({ 'TRANSACTION' : 'TRANSACTION', #needed? 'PASSTHROUGH' : 'PASSTHROUGH', 'EMPTY' : '' }) -FINANCE_COMPONENT = namedict({ +FINANCE_COMPONENT = ndict({ 'TRADING_CLIENT' : 'TRADING_CLIENT', 'PORTFOLIO_CLIENT' : 'PORTFOLIO_CLIENT', 'ORDER_SOURCE' : 'ORDER_SOURCE', diff --git a/zipline/test/test_monitor.py b/zipline/test/test_monitor.py deleted file mode 100644 index e69de29b..00000000 diff --git a/zipline/test/test_transforms.py b/zipline/test/test_transforms.py deleted file mode 100644 index 6a2bf204..00000000 --- a/zipline/test/test_transforms.py +++ /dev/null @@ -1,97 +0,0 @@ -from datetime import timedelta -from collections import defaultdict -from unittest2 import TestCase - -import zipline.test.factory as factory -import zipline.util as qutil -from zipline.finance.vwap import DailyVWAP, VWAPTransform -from zipline.finance.returns import ReturnsFromPriorClose -from zipline.finance.movingaverage import MovingAverage -from zipline.lines import SimulatedTrading -from zipline.simulator import AddressAllocator, Simulator - - -allocator = AddressAllocator(1000) - -class ZiplineWithTransformsTestCase(TestCase): - leased_sockets = defaultdict(list) - - def setUp(self): - # skip ahead 100 spots - allocator.lease(100) - qutil.configure_logging() - self.trading_environment = factory.create_trading_environment() - self.zipline_test_config = { - 'allocator':allocator, - 'sid':133 - } - - def test_vwap_tnfm(self): - zipline = SimulatedTrading.create_test_zipline( - **self.zipline_test_config - ) - - vwap = VWAPTransform("vwap_10", daycount=10) - zipline.add_transform(vwap) - - zipline.simulate(blocking=True) - - self.assertTrue(zipline.sim.ready()) - self.assertFalse(zipline.sim.exception) - -class FinanceTransformsTestCase(TestCase): - def setUp(self): - self.trading_environment = factory.create_trading_environment() - - def test_vwap(self): - - trade_history = factory.create_trade_history( - 133, - [10.0, 10.0, 10.0, 11.0], - [100, 100, 100, 300], - timedelta(days=1), - self.trading_environment - ) - - vwap = DailyVWAP(daycount=2) - for trade in trade_history: - vwap.update(trade) - - self.assertEqual(vwap.vwap, 10.75) - - - def test_returns(self): - trade_history = factory.create_trade_history( - 133, - [10.0, 10.0, 10.0, 11.0], - [100, 100, 100, 300], - timedelta(days=1), - self.trading_environment - ) - - returns = ReturnsFromPriorClose() - for trade in trade_history: - returns.update(trade) - - - self.assertEqual(returns.returns, .1) - - - def test_moving_average(self): - trade_history = factory.create_trade_history( - 133, - [10.0, 10.0, 10.0, 11.0], - [100, 100, 100, 300], - timedelta(days=1), - self.trading_environment - ) - - ma = MovingAverage(daycount=2) - for trade in trade_history: - ma.update(trade) - - - self.assertEqual(ma.average, 10.5) - - - \ No newline at end of file diff --git a/zipline/test/transform.py b/zipline/test/transform.py deleted file mode 100644 index 0f81a3f5..00000000 --- a/zipline/test/transform.py +++ /dev/null @@ -1,22 +0,0 @@ -from zipline.messaging import BaseTransform -from zipline.protocol import COMPONENT_TYPE - -class DivideByZeroTransform(BaseTransform): - """ - A transform that fails. - """ - - def __init__(self, name): - BaseTransform.__init__(self, "PASSTHROUGH") - self.state['name'] = name - self.init() - - def init(self): - pass - - @property - def get_type(self): - return COMPONENT_TYPE.CONDUIT - - def transform(self, event): - return { 'value': 0/0 } diff --git a/zipline/test/algorithms.py b/zipline/test_algorithms.py similarity index 94% rename from zipline/test/algorithms.py rename to zipline/test_algorithms.py index b3743709..1c1e71c3 100644 --- a/zipline/test/algorithms.py +++ b/zipline/test_algorithms.py @@ -16,8 +16,8 @@ The algorithm must expose methods: of valid sids. List must have a length between 1 and 10. If None is returned the filter will block all events. - - handle_frame: method that accepts a :py:class:`pandas.Dataframe` of the - current state of the simulation universe. An example frame:: + - handle_data: method that accepts a :py:class:`zipline.protocol_utils.ndict` + of the current state of the simulation universe. An example data ndict:: +-----------------+--------------+----------------+--------------------+ | | SID(133) | SID(134) | SID(135) | @@ -74,7 +74,7 @@ class TestAlgorithm(): def set_portfolio(self, portfolio): self.portfolio = portfolio - def handle_frame(self, frame): + def handle_data(self, data): self.frame_count += 1 #place an order for 100 shares of sid if self.incr < self.count: @@ -110,7 +110,7 @@ class HeavyBuyAlgorithm(): def set_portfolio(self, portfolio): self.portfolio = portfolio - def handle_frame(self, frame): + def handle_data(self, data): self.frame_count += 1 #place an order for 100 shares of sid self.order(self.sid, self.amount) @@ -133,7 +133,7 @@ class NoopAlgorithm(object): def set_portfolio(self, portfolio): pass - def handle_frame(self, frame): + def handle_data(self, data): pass def get_sid_filter(self): diff --git a/zipline/topology.py b/zipline/topology.py deleted file mode 100644 index b5b92125..00000000 --- a/zipline/topology.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -Contains the various deployable topologies of ziplines. - -This is mostly hardcoded at the moment but as the topologies -becomes more sophisiticated this logic will be the primary -router of sockets. - -Ontology of Stream Processing -============================= - -Source -****** - -A producer of data. The data could be in a datastore, coming from a -socket, etc. To access this data, we pull from the source. Sources increase the -total amount of data flowing through the system. Sources are generally not -pure since they involve IO. - -Sink -**** - -A consumer of data. Basic examples would be a sum function (adding up a -stream of numbers fed in), a datastore sink, a socket etc. We push data -into a sink. When / If a sink completes processing, it may return some -value that exists outside of the system. - -Sinks decrease the total amount of information flowing through the system. - -Conduit -******* - -A transformer of data. We push data into a conduit. Similar to a sink, -but instead of returning a single value at the end, a conduit can -return multiple outputs every time it is pushed to. The returned values -remain in the system. - -Conduits may or may not be pure, it is usefull to distinguish between the -two since pure conduits have a variety of nice properties under composition - -""" - -from zipline.protocol import COMPONENT_TYPE - -class Topology(object): - pass - -class DiamondTopology(Topology): - """ - Exposes a feed, merge, and passthrough bypass:: - - +--------+ - +---------->| |---------------+ - | +--------+ | - | v - +---+----+ +---+----+ +--------+ +--------+ +---+----+ - | +-->| +----->| |---------->| |--->| | - +---+----+ +---+----+ +--------+ +--------+ +---+----+ - | ^ - | +--------+ | - +---------->| |---------------+ - | +--------+ | - | | - +------------passthru----------------+ - - """ - - flow = { - 'flow' : COMPONENT_TYPE.SOURCE , - 'serializers' : COMPONENT_TYPE.CONDUIT , - 'transforms' : COMPONENT_TYPE.CONDUIT , - 'merges' : COMPONENT_TYPE.CONDUIT , - 'clients' : COMPONENT_TYPE.SINK , - } - - def __init__(self): - self.sources = [] - self.serializers = [] - self.transforms = [] - self.merges = [] - self.clients = [] diff --git a/zipline/test/test_devsimulator.py b/zipline/toys/__init__.py similarity index 100% rename from zipline/test/test_devsimulator.py rename to zipline/toys/__init__.py diff --git a/zipline/transforms/__init__.py b/zipline/transforms/__init__.py index ea33ca7c..fb244e2e 100644 --- a/zipline/transforms/__init__.py +++ b/zipline/transforms/__init__.py @@ -6,14 +6,20 @@ Transforms provide re-useable components for stream processing. All Transforms expect to receive data events from zipline.core.DataFeed asynchronously via zeromq. Each transform is designed to run in independent process space, independently of all other transforms, to allow for parallel -computation. +computation. Each transform must maintain the state necessary to calculate the transform of -each new feed events. +each new feed events. To simplify the consumption of feed and transform data events, this module also provides the TransformsMerge class. TransformsMerge initializes as set of transforms and subscribes to their output. Each feed event is then combined with all the transforms of that event into a single new message. -""" \ No newline at end of file +""" + +from base import BaseTransform + +__all__ = [ + BaseTransform, +] diff --git a/zipline/transforms/base.py b/zipline/transforms/base.py new file mode 100644 index 00000000..90162437 --- /dev/null +++ b/zipline/transforms/base.py @@ -0,0 +1,134 @@ +import logging +from zipline.core.component import Component + +import zipline.protocol as zp +from zipline.protocol import CONTROL_PROTOCOL, COMPONENT_TYPE, \ + CONTROL_FRAME, CONTROL_UNFRAME + +LOGGER = logging.getLogger('ZiplineLogger') + +class BaseTransform(Component): + """ + Top level execution entry point for the transform + + - connects to the feed socket to subscribe to events + - connects to the result socket (most oftened bound by a TransformsMerge) to PUSH transforms + - processes all messages received from feed, until DONE message received + - pushes all transforms + - sends DONE to result socket, closes all sockets and context + + Parent class for feed transforms. Subclass and override transform + method to create a new derived value from the combined feed. + """ + + def __init__(self, name, **kwargs): + Component.__init__(self) + + self.state = { + 'name': name + } + + self.init(**kwargs) + + def init(self): + pass + + @property + def get_id(self): + return self.state['name'] + + @property + def get_type(self): + return COMPONENT_TYPE.CONDUIT + + def open(self): + """ + Establishes zmq connections. + """ + #create the feed. + self.feed_socket = self.connect_feed() + #create the result PUSH + self.result_socket = self.connect_merge() + + def do_work(self): + """ + Loops until feed's DONE message is received: + + - receive an event from the data feed + - call transform (subclass' method) on event + - send the transformed event + + """ + socks = dict(self.poll.poll(self.heartbeat_timeout)) + + # TODO: Abstract this out, maybe on base component + if self.control_in in socks and socks[self.control_in] == self.zmq.POLLIN: + msg = self.control_in.recv() + event, payload = CONTROL_UNFRAME(msg) + + # -- Heartbeat -- + if event == CONTROL_PROTOCOL.HEARTBEAT: + # Heart outgoing + heartbeat_frame = CONTROL_FRAME( + CONTROL_PROTOCOL.OK, + payload + ) + self.control_out.send(heartbeat_frame) + + # -- Soft Kill -- + elif event == CONTROL_PROTOCOL.SHUTDOWN: + self.signal_done() + self.shutdown() + + # -- Hard Kill -- + elif event == CONTROL_PROTOCOL.KILL: + self.kill() + + if self.feed_socket in socks and socks[self.feed_socket] == self.zmq.POLLIN: + message = self.feed_socket.recv() + + if message == str(CONTROL_PROTOCOL.DONE): + self.signal_done() + return + + try: + event = self.unframe(message) + except zp.INVALID_FEED_FRAME as exc: + return self.signal_exception(exc) + + try: + cur_state = self.transform(event) + + # This is overloaded, so it can fail in all sorts of + # unknown ways. Its best to catch it in the + # Transformer itself. + except Exception as exc: + return self.signal_exception(exc) + + try: + transform_frame = self.frame(cur_state) + except zp.INVALID_TRANSFORM_FRAME as exc: + return self.signal_exception(exc) + + self.result_socket.send(transform_frame, self.zmq.NOBLOCK) + + def frame(self, cur_state): + return zp.TRANSFORM_FRAME(cur_state['name'], cur_state['value']) + + def unframe(self, msg): + return zp.FEED_UNFRAME(msg) + + def transform(self, event): + """ + Must return the transformed value as a map with:: + + {name:"name of new transform", value: "value of new field"} + + Transforms run in parallel and results are merged into a single map, so + transform names must be unique. Best practice is to use the self.state + object initialized from the transform configuration, and only set the + transformed value:: + + self.state['value'] = transformed_value + """ + raise NotImplementedError diff --git a/zipline/util.py b/zipline/util.py deleted file mode 100644 index b064306a..00000000 --- a/zipline/util.py +++ /dev/null @@ -1,28 +0,0 @@ -""" -Small classes to assist with timezone calculations, LOGGER configuration, -and other common operations. -""" - -import datetime -import pytz -import logging -import logging.handlers - -LOGGER = logging.getLogger('ZiplineLogger') - -def configure_logging(loglevel=logging.DEBUG): - """ - Configures zipline.util.LOGGER to write a rotating file - (10M per file, 5 files) to `` /var/log/zipline.log ``. - """ - LOGGER.setLevel(loglevel) - handler = logging.handlers.RotatingFileHandler( - "/var/log/zipline/{lfn}.log".format(lfn="zipline"), - maxBytes=10*1024*1024, backupCount=5 - ) - handler.setFormatter(logging.Formatter( - "%(asctime)s %(levelname)s %(filename)s %(funcName)s - %(message)s", - "%Y-%m-%d %H:%M:%S %Z") - ) - LOGGER.addHandler(handler) - LOGGER.info("logging started...") diff --git a/zipline/utils/__init__.py b/zipline/utils/__init__.py new file mode 100644 index 00000000..8a1b7b26 --- /dev/null +++ b/zipline/utils/__init__.py @@ -0,0 +1,5 @@ +from protocol_utils import ndict + +__all__ = [ + ndict, +] diff --git a/zipline/date_utils.py b/zipline/utils/date_utils.py similarity index 100% rename from zipline/date_utils.py rename to zipline/utils/date_utils.py diff --git a/zipline/test/factory.py b/zipline/utils/factory.py similarity index 87% rename from zipline/test/factory.py rename to zipline/utils/factory.py index e0fb89e0..88c7676c 100644 --- a/zipline/test/factory.py +++ b/zipline/utils/factory.py @@ -1,6 +1,7 @@ """ Factory functions to prepare useful data for tests. """ + import pytz import msgpack import random @@ -8,17 +9,14 @@ from os.path import join from operator import attrgetter from datetime import datetime, timedelta -import zipline import zipline.finance.risk as risk import zipline.protocol as zp -from zipline.sources import SpecificEquityTrades, RandomEquityTrades +from zipline.finance.sources import SpecificEquityTrades, RandomEquityTrades from zipline.finance.trading import TradingEnvironment def load_market_data(): - data_path = join(zipline.__path__[0], "test") - with open(join(data_path, "benchmark.msgpack"), "rb") as fp_bm: - bm_list = msgpack.loads(fp_bm.read()) - + fp_bm = open("./tests/benchmark.msgpack", "rb") + bm_list = msgpack.loads(fp_bm.read()) bm_returns = [] for packed_date, returns in bm_list: event_dt = zp.tuple_to_date(packed_date) @@ -33,8 +31,8 @@ def load_market_data(): bm_returns.append(daily_return) bm_returns = sorted(bm_returns, key=attrgetter('date')) - with open(join(data_path, "treasury_curves.msgpack"), "rb") as fp_tr: - tr_list = msgpack.loads(fp_tr.read()) + fp_tr = open(".//tests/treasury_curves.msgpack", "rb") + tr_list = msgpack.loads(fp_tr.read()) tr_curves = {} for packed_date, curve in tr_list: tr_dt = zp.tuple_to_date(packed_date) @@ -42,7 +40,7 @@ def load_market_data(): tr_curves[tr_dt] = curve return bm_returns, tr_curves - + def create_trading_environment(year=2006): """Construct a complete environment with reasonable defaults""" benchmark_returns, treasury_curves = load_market_data() @@ -58,8 +56,9 @@ def create_trading_environment(year=2006): ) return trading_environment + def create_trade(sid, price, amount, datetime): - row = zp.namedict({ + row = zp.ndict({ 'source_id' : "test_factory", 'type' : zp.DATASOURCE_TYPE.TRADE, 'sid' : sid, @@ -83,7 +82,7 @@ def create_trade_history(sid, prices, amounts, interval, trading_calendar): current = trading_calendar.first_open for price, amount in zip(prices, amounts): - + trade = create_trade(sid, price, amount, current) trades.append(trade) current = get_next_trading_dt(current, interval, trading_calendar) @@ -92,11 +91,11 @@ def create_trade_history(sid, prices, amounts, interval, trading_calendar): return trades def create_txn(sid, price, amount, datetime, btrid=None): - txn = zp.namedict({ - 'sid':sid, - 'amount':amount, - 'dt':datetime, - 'price':price, + txn = zp.ndict({ + 'sid' : sid, + 'amount' : amount, + 'dt' : datetime, + 'price' : price, }) return txn @@ -172,7 +171,7 @@ def create_random_trade_source(sid, trade_count, trading_environment): return source def create_daily_trade_source(sids, trade_count, trading_environment): - + """ creates trade_count trades for each sid in sids list. first trade will be on trading_environment.period_start, and daily @@ -183,9 +182,9 @@ def create_daily_trade_source(sids, trade_count, trading_environment): to match the day of the final trade. """ return create_trade_source( - sids, - trade_count, - timedelta(days=1), + sids, + trade_count, + timedelta(days=1), trading_environment ) @@ -193,26 +192,28 @@ def create_daily_trade_source(sids, trade_count, trading_environment): def create_minutely_trade_source(sids, trade_count, trading_environment): """ - creates trade_count trades for each sid in sids list. - first trade will be on trading_environment.period_start, and every minute - thereafter for each sid. Thus, two sids should result in two trades per - minute. + creates trade_count trades for each sid in sids list. + first trade will be on trading_environment.period_start, and every minute + thereafter for each sid. Thus, two sids should result in two trades per + minute. Important side-effect: trading_environment.period_end will be modified - to match the day of the final trade. + to match the day of the final trade. """ return create_trade_source( - sids, - trade_count, - timedelta(minutes=1), + sids, + trade_count, + timedelta(minutes=1), trading_environment ) def create_trade_source(sids, trade_count, trade_time_increment, trading_environment): trade_history = [] + + price = [10.1] * trade_count + volume = [100] * trade_count + for sid in sids: - price = [10.1] * trade_count - volume = [100] * trade_count start_date = trading_environment.first_open generated_trades = create_trade_history( diff --git a/zipline/gpoll.py b/zipline/utils/gpoll.py similarity index 100% rename from zipline/gpoll.py rename to zipline/utils/gpoll.py diff --git a/zipline/utils/logger.py b/zipline/utils/logger.py new file mode 100644 index 00000000..287171c5 --- /dev/null +++ b/zipline/utils/logger.py @@ -0,0 +1,13 @@ +""" +Small classes to assist with timezone calculations, LOGGER configuration, +and other common operations. +""" + +import logging +import logging.config + +def configure_logging(): + logging.config.fileConfig( + 'logging.cfg', + disable_existing_loggers = False + ) diff --git a/zipline/protocol_utils.py b/zipline/utils/protocol_utils.py similarity index 68% rename from zipline/protocol_utils.py rename to zipline/utils/protocol_utils.py index 60c90814..376fc8c7 100644 --- a/zipline/protocol_utils.py +++ b/zipline/utils/protocol_utils.py @@ -31,79 +31,6 @@ def FrameExceptionFactory(name): return InvalidFrame -class namedict(MutableMapping): - """ - - Namedicts are dict like objects that have fields accessible by attribute lookup - as well as being indexable and iterable:: - - HEARTBEAT_PROTOCOL = namedict({ - 'REQ' : b'\x01', - 'REP' : b'\x02', - }) - - HEARTBEAT_PROTOCOL.REQ # syntactic sugar - HEARTBEAT_PROTOCOL.REP # oh suga suga - - For more complex structs use collections.namedtuple: - """ - - def __init__(self, dct=None): - if(dct): - self.__dict__.update(dct) - - def __setitem__(self, key, value): - """ - Required for use by pymongo as_class parameter to find. - """ - if(key == '_id'): - self.__dict__['id'] = value - else: - self.__dict__[key] = value - - def __getitem__(self, key): - return self.__dict__[key] - - def __delitem__(self, key): - del self.__dict__[key] - - def __iter__(self): - return self.__dict__.iterkeys() - - def __len__(self): - return len(self.__dict__) - - def keys(self): - return self.__dict__.keys() - - def as_dict(self): - # shallow copy is O(n) - return copy.copy(self.__dict__) - - def delete(self, key): - del(self.__dict__[key]) - - def merge(self, other_nd): - assert isinstance(other_nd, namedict) - self.__dict__.update(other_nd.__dict__) - - def __repr__(self): - return "namedict: " + str(self.__dict__) - - def __eq__(self, other): - # !!!!!!!!!!!!!!!!!!!! - # !!!! DANGEROUS !!!!! - # !!!!!!!!!!!!!!!!!!!! - return other != None and self.__dict__ == other.__dict__ - - def has_attr(self, name): - return self.__dict__.has_key(name) - - def as_series(self): - s = pandas.Series(self.__dict__) - s.name = self.sid - return s - class ndict(MutableMapping): """ Xtreme Namedicts 2.0 @@ -123,6 +50,13 @@ class ndict(MutableMapping): # Abstact Overloads # ----------------- + def __setattr__(self, key, value): + if '_ndict' in key or key == 'cls': + self.__dict__[key] = value + else: + self.__internal[key] = value + return value + def __setitem__(self, key, value): """ Required for use by pymongo as_class parameter to find. @@ -132,7 +66,6 @@ class ndict(MutableMapping): else: self.__internal[key] = value - def __getattr__(self, key): if key in self.cls: return self.__dict__[key] @@ -219,3 +152,23 @@ class ndict(MutableMapping): #return False #return True + +# This is not neccesarily the most intuitive construction, but +# we're aiming for raw performance rather than readability. So +# we do things that we would not normally do in business logic. +def namelookup(dct): + ks = dct.keys() + vs = dct.values() + dct = {} + class _lookup: + __slots__ = ks + def __init__(self): + for k, v in zip(ks, vs): + setattr(self,k,v) + self.__setattr__ = self.locked + def locked(self,k,v): + raise Exception('Name lookups are fixed at init.') + def __repr__(self): + return '' % self.__slots__ + del dct + return _lookup() diff --git a/zipline/serial.py b/zipline/utils/serial.py similarity index 100% rename from zipline/serial.py rename to zipline/utils/serial.py diff --git a/zipline/zmq_utils.py b/zipline/utils/zmq_utils.py similarity index 100% rename from zipline/zmq_utils.py rename to zipline/utils/zmq_utils.py diff --git a/zipline/version.py b/zipline/version.py new file mode 100644 index 00000000..fc9fc57e --- /dev/null +++ b/zipline/version.py @@ -0,0 +1,9 @@ +BANNER = """ +Zipline {version} +Released under BSD3 +""".strip() + +VERSION = ( 0, 0, 1, 'dev' ) + +def pretty_version(): + return BANNER.format(version='.'.join(VERSION))