diff --git a/README.md b/README.md index ae9fb91b..58e3c0b4 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,6 @@ Scientific python on the Mac can be a bit confusing because of the many independ - umfpack - you need this to build scipy. ```brew install umfpack``` - swig - you need this to build scipy. ```brew install swig``` - hdf5 - you need this to build tables. ```brew install hdf5``` -- zeromq - you need this to run qbt. ```brew install zmq``` Data Sources diff --git a/etc/requirements.txt b/etc/requirements.txt index a620712d..9664715d 100644 --- a/etc/requirements.txt +++ b/etc/requirements.txt @@ -2,13 +2,6 @@ msgpack-python==0.1.12 humanhash==0.0.1 iso8601==0.1.4 -# ZeroMQ -pyzmq==2.1.11 -gevent-zeromq==0.2.2 - -# Unix -setproctitle==1.1.6 - # Logging Logbook==0.3 diff --git a/etc/requirements_sci.txt b/etc/requirements_sci.txt index f138534c..02a76bef 100644 --- a/etc/requirements_sci.txt +++ b/etc/requirements_sci.txt @@ -16,5 +16,3 @@ patsy==0.1.0 statsmodels>=0.5.0 scikit-learn==0.11 -# ZeroMQ -pyzmq==2.1.11 diff --git a/pavement.py b/pavement.py index 5353c8a3..e46e5cc5 100644 --- a/pavement.py +++ b/pavement.py @@ -110,7 +110,6 @@ options( # Because I'm lazy stuff_i_want_in_my_debug_shell = [ ('qutil', 'zipline.util', []), - ('zmq', 'zmq', []), ] @task diff --git a/tests/test_exception_handling.py b/tests/test_exception_handling.py index 7b4fb624..8e21f35b 100644 --- a/tests/test_exception_handling.py +++ b/tests/test_exception_handling.py @@ -1,4 +1,3 @@ -import zmq from unittest2 import TestCase from collections import defaultdict @@ -9,7 +8,7 @@ from zipline.finance.trading import SIMULATION_STYLE from zipline.core.devsimulator import AddressAllocator from zipline.lines import SimulatedTrading from zipline.gens.transform import StatefulTransform -from zipline.gens.tradesimulation import MAX_HEARTBEAT_INTERVALS +from zipline.utils.timeout import TimeoutException from zipline.utils.test_utils import ( drain_zipline, @@ -22,7 +21,6 @@ from zipline.utils.test_utils import ( DEFAULT_TIMEOUT = 15 # seconds EXTENDED_TIMEOUT = 90 -allocator = AddressAllocator(1000) class ExceptionTestCase(TestCase): @@ -31,14 +29,11 @@ class ExceptionTestCase(TestCase): def setUp(self): self.zipline_test_config = { 'sid' : 133, - 'results_socket_uri' : allocator.lease(1)[0], 'simulation_style' : SIMULATION_STYLE.FIXED_SLIPPAGE } - self.ctx = zmq.Context() setup_logger(self) def tearDown(self): - self.ctx.term() teardown_logger(self) def test_datasource_exception(self): @@ -46,15 +41,15 @@ class ExceptionTestCase(TestCase): zipline = SimulatedTrading.create_test_zipline( **self.zipline_test_config ) - output, _ = drain_zipline(self, zipline) - assert len(output) == 1 - assert output[0]['prefix'] == 'EXCEPTION' - message = output[0]['payload'] - for field in ['date', 'message', 'name', 'stack']: - assert field in message.keys() - assert message['message'] == 'integer division or modulo by zero' - assert message['name'] == 'ZeroDivisionError' + with self.assertRaises(ZeroDivisionError) as ctx: + output, _ = drain_zipline(self, zipline) + + self.assertEqual( + ctx.exception.message, + 'integer division or modulo by zero' + ) + def test_tranform_exception(self): exc_tnfm = StatefulTransform(ExceptionTransform) @@ -63,16 +58,11 @@ class ExceptionTestCase(TestCase): zipline = SimulatedTrading.create_test_zipline( **self.zipline_test_config ) - output, _ = drain_zipline(self, zipline) - assert len(output) == 1 - assert output[0]['prefix'] == 'EXCEPTION' - message = output[0]['payload'] - for field in ['date', 'message', 'name', 'stack']: - assert field in message.keys() - assert message['message'] == 'An assertion message' - assert message['name'] == 'AssertionError' + with self.assertRaises(AssertionError) as ctx: + output, _ = drain_zipline(self, zipline) + self.assertEqual(ctx.exception.message,'An assertion message') def test_exception_in_init(self): # Simulation @@ -86,16 +76,11 @@ class ExceptionTestCase(TestCase): zipline = SimulatedTrading.create_test_zipline( **self.zipline_test_config ) - output, _ = drain_zipline(self, zipline) - self.assertEqual(output[-1]['prefix'], 'EXCEPTION') - payload = output[-1]['payload'] - self.assertTrue(payload['date']) - self.assertEqual(payload['message'],'Algo exception in initialize') - self.assertEqual(payload['name'],'Exception') - # make sure our path shortening is working - self.assertEqual(payload['stack'][0]['filename'], '/zipline/lines.py') - self.assertEqual(payload['stack'][-1]['filename'], '/zipline/test_algorithms.py') + with self.assertRaises(Exception) as ctx: + output, _ = drain_zipline(self, zipline) + + self.assertEqual(ctx.exception.message,'Algo exception in initialize') def test_exception_in_handle_data(self): # Simulation @@ -110,16 +95,10 @@ class ExceptionTestCase(TestCase): **self.zipline_test_config ) - output, _ = drain_zipline(self, zipline) - self.assertEqual(output[-1]['prefix'], 'EXCEPTION') - payload = output[-1]['payload'] - self.assertTrue(payload['date']) - del payload['date'] - self.assertEqual(payload['message'],'Algo exception in handle_data') - self.assertEqual(payload['name'],'Exception') - # make sure our path shortening is working - self.assertEqual(payload['stack'][0]['filename'], '/zipline/lines.py') - self.assertEqual(payload['stack'][-1]['filename'], '/zipline/test_algorithms.py') + with self.assertRaises(Exception) as ctx: + output, _ = drain_zipline(self, zipline) + + self.assertEqual(ctx.exception.message,'Algo exception in handle_data') def test_zerodivision_exception_in_handle_data(self): @@ -134,36 +113,30 @@ class ExceptionTestCase(TestCase): **self.zipline_test_config ) - output, _ = drain_zipline(self, zipline) + with self.assertRaises(ZeroDivisionError) as ctx: + output, _ = drain_zipline(self, zipline) + + self.assertEqual(ctx.exception.message,'integer division or modulo by zero') - self.assertEqual(output[-1]['prefix'], 'EXCEPTION') - payload = output[-1]['payload'] - self.assertTrue(payload['date']) - del payload['date'] - self.assertEqual(payload['message'],'integer division or modulo by zero') - self.assertEqual(payload['name'],'ZeroDivisionError') - # make sure our path shortening is working - self.assertEqual(payload['stack'][0]['filename'], '/zipline/lines.py') - self.assertEqual(payload['stack'][-1]['filename'], '/zipline/test_algorithms.py') def test_initialize_timeout(self): - + self.zipline_test_config['algorithm'] = \ InitializeTimeoutAlgorithm( self.zipline_test_config['sid'] ) - + zipline = SimulatedTrading.create_test_zipline( **self.zipline_test_config ) - output, _ = drain_zipline(self, zipline) - self.assertEqual(output[-1]['prefix'], 'EXCEPTION') - payload = output[-1]['payload'] - self.assertEqual(payload['name'],'TimeoutException') - self.assertEqual(payload['message'], 'Call to initialize timed out') + + with self.assertRaises(TimeoutException) as ctx: + output, _ = drain_zipline(self, zipline) + + self.assertEqual(ctx.exception.message,'Call to initialize timed out') def test_heartbeat(self): - + self.zipline_test_config['algorithm'] = \ TooMuchProcessingAlgorithm( self.zipline_test_config['sid'] @@ -171,20 +144,11 @@ class ExceptionTestCase(TestCase): zipline = SimulatedTrading.create_test_zipline( **self.zipline_test_config ) - output, _ = drain_zipline(self, zipline) - - # There should be a message for each hearbeat, plus a message - # for the final timeout. - assert len(output) == MAX_HEARTBEAT_INTERVALS + 1 - # Assert that everything but the last message is a heartbeat log. - for message in output[0:-1]: - assert message['prefix'] == 'LOG' - assert message['payload']['func_name'] == 'log_heartbeats' + with self.assertRaises(TimeoutException) as ctx: + output, _ = drain_zipline(self, zipline) - # Assert that the last message is a timeout exception. - self.assertEqual(output[-1]['prefix'], 'EXCEPTION') - payload = output[-1]['payload'] - self.assertEqual(payload['name'],'TimeoutException') - self.assertEqual(payload['message'], 'Too much time spent in handle_data call') - + self.assertEqual( + ctx.exception.message, + 'Too much time spent in handle_data call' + ) diff --git a/tests/test_finance.py b/tests/test_finance.py index 87795609..aab30554 100644 --- a/tests/test_finance.py +++ b/tests/test_finance.py @@ -2,7 +2,6 @@ Tests for the zipline.finance package """ import pytz -import zmq from unittest2 import TestCase from datetime import datetime, timedelta @@ -13,7 +12,6 @@ from nose.tools import timed import zipline.utils.factory as factory from zipline.finance.trading import TradingEnvironment -from zipline.core.devsimulator import AddressAllocator from zipline.lines import SimulatedTrading from zipline.finance.performance import PerformanceTracker from zipline.utils.protocol_utils import ndict @@ -27,7 +25,6 @@ from zipline.utils.test_utils import \ DEFAULT_TIMEOUT = 15 # seconds EXTENDED_TIMEOUT = 90 -allocator = AddressAllocator(1000) class FinanceTestCase(TestCase): @@ -36,9 +33,7 @@ class FinanceTestCase(TestCase): def setUp(self): self.zipline_test_config = { 'sid' : 133, - 'results_socket_uri' : allocator.lease(1)[0] } - self.ctx = zmq.Context() setup_logger(self) diff --git a/tests/test_optimize.py b/tests/test_optimize.py index c16454b1..365a2a23 100644 --- a/tests/test_optimize.py +++ b/tests/test_optimize.py @@ -12,8 +12,6 @@ from zipline.core.devsimulator import AddressAllocator DEFAULT_TIMEOUT = 15 # seconds EXTENDED_TIMEOUT = 90 -allocator = AddressAllocator(1000) - from zipline.utils.test_utils import setup_logger, teardown_logger class TestUpDown(TestCase): @@ -26,7 +24,6 @@ class TestUpDown(TestCase): def setUp(self): self.zipline_test_config = { - 'allocator' : allocator, 'sid' : 133, 'trade_count' : 5, 'amplitude' : 30, diff --git a/tests/test_protocol.py b/tests/test_protocol.py deleted file mode 100644 index d1606ed4..00000000 --- a/tests/test_protocol.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -Test the FRAME/UNFRAME functions in the sequence expected from ziplines. -""" -from unittest2 import TestCase -from datetime import datetime, timedelta -from collections import defaultdict - -from nose.tools import timed - -import zipline.utils.factory as factory -import zipline.protocol as zp - - -DEFAULT_TIMEOUT = 5 # seconds - -class ProtocolTestCase(TestCase): - - leased_sockets = defaultdict(list) - - def setUp(self): - #qutil.configure_logging() - self.trading_environment = factory.create_trading_environment() - - @timed(DEFAULT_TIMEOUT) - def test_trade_feed_protocol(self): - - sid = 133 - price = [10.0] * 4 - volume = [100] * 4 - - start_date = datetime.strptime("02/15/2012","%m/%d/%Y") - one_day_td = timedelta(days=1) - - trades = factory.create_trade_history( - sid, - price, - volume, - one_day_td, - self.trading_environment - ) - - for trade in trades: - #simulate data source sending frame - msg = zp.DATASOURCE_FRAME(zp.ndict(trade)) - #feed unpacking frame - recovered_trade = zp.DATASOURCE_UNFRAME(msg) - #feed sending frame - feed_msg = zp.FEED_FRAME(recovered_trade) - #transform unframing - recovered_feed = zp.FEED_UNFRAME(feed_msg) - #do a transform - trans_msg = zp.TRANSFORM_FRAME('helloworld', 2345.6) - #simulate passthrough transform -- passthrough shouldn't even - # unpack the msg, just resend. - - passthrough_msg = zp.TRANSFORM_FRAME(zp.TRANSFORM_TYPE.PASSTHROUGH,\ - feed_msg) - - #merge unframes transform and passthrough - trans_recovered = zp.TRANSFORM_UNFRAME(trans_msg) - pt_recovered = zp.TRANSFORM_UNFRAME(passthrough_msg) - #simulated merge - pt_recovered.PASSTHROUGH.merge(trans_recovered) - #frame the merged event - merged_msg = zp.MERGE_FRAME(pt_recovered.PASSTHROUGH) - #unframe the merge and validate values - event = zp.MERGE_UNFRAME(merged_msg) - - #check the transformed value, should only be in event, not trade. - self.assertTrue(event.helloworld == 2345.6) - event.delete('helloworld') - - self.assertEqual(zp.ndict(trade), event) diff --git a/zipline/__init__.py b/zipline/__init__.py index 31272fcb..4151cd02 100644 --- a/zipline/__init__.py +++ b/zipline/__init__.py @@ -5,10 +5,8 @@ Zipline # This is *not* a place to dump arbitrary classes/modules for convenience, # it is a place to expose the public interfaces. -import protocol # namespace from utils.protocol_utils import ndict __all__ = [ - protocol, ndict ] diff --git a/zipline/lines.py b/zipline/lines.py index 75c473f0..1bc5cc90 100644 --- a/zipline/lines.py +++ b/zipline/lines.py @@ -59,16 +59,9 @@ before invoking simulate. | __init__. | +---------------------------------+ """ -import sys -import zmq -import os -from signal import SIGHUP, SIGINT -import multiprocessing -from setproctitle import setproctitle from zipline.test_algorithms import TestAlgorithm from zipline.finance.trading import SIMULATION_STYLE -from zipline.utils.log_utils import ZeroMQLogHandler from zipline.utils import factory from zipline.gens.composites import ( @@ -78,8 +71,6 @@ from zipline.gens.composites import ( from zipline.gens.tradesimulation import TradeSimulationClient as tsc from logbook import Logger -import zipline.protocol as zp - log = Logger('Lines') @@ -90,10 +81,27 @@ class SimulatedTrading(object): transforms, algorithm, environment, - style, - results_socket_uri, - context, - sim_id): + style): + """ + @sources - an iterable of iterables + These iterables must yield ndicts that contain: + - type :: a ziplines.protocol.DATASOURCE_TYPE + - dt :: a milliseconds since epoch timestamp in UTC + + @transforms - An iterable of instances of StatefulTransform. + + @algorithm - An object that implements: + `def initialize(self)` + `def handle_data(self, data)` + `def get_sid_filter(self)` + `def set_logger(self, logger)` + `def set_order(self, order_callable)` + + @environment - An instance of finance.trading.TradingEnvironment + + @style - protocol.SIMULATION_STYLE + """ + self.date_sorted = date_sorted_sources(*sources) self.transforms = transforms @@ -102,138 +110,12 @@ class SimulatedTrading(object): *self.transforms) self.trading_client = tsc(algorithm, environment, style) self.gen = self.trading_client.simulate(self.with_tnfms) - self.results_uri = results_socket_uri - self.results_socket = None - self.context = context - self.sim_id = sim_id - # optional process if we fork simulate into an - # independent process. - self.proc = None - self.send_sighup = False - self.logger = Logger(sim_id) - self.print_logger = Logger('Print') + def __iter__(self): + return self - # exit status flag - self.success = False - - def simulate(self, blocking=True, send_sighup=False): - - # for non-blocking, - if blocking: - self.run_gen() - else: - self.send_sighup = send_sighup - return self.fork_and_sim() - - def fork_and_sim(self): - self.proc = multiprocessing.Process(target=self.run_gen) - self.proc.start() - return self.proc - - def run_gen(self): - setproctitle(self.sim_id) - self.open() - if self.zmq_out: - with self.zmq_out.threadbound(): - self.stream_results() - # if no log socket, just run the algo normally - else: - self.stream_results() - - def stream_results(self): - assert self.results_socket, \ - "Results socket must exist to stream results" - try: - for event in self.gen: - if 'daily_perf' in event: - msg = zp.PERF_FRAME(event) - else: - msg = zp.RISK_FRAME(event) - self.results_socket.send(msg) - - self.signal_done() - self.success = True - except Exception as exc: - self.handle_exception(exc) - finally: - # not much to do besides log our exit. - self.close() - - def signal_done(self): - # notify monitor we're done - done_frame = zp.DONE_FRAME('success') - self.results_socket.send(done_frame) - - def close(self): - log.info("Closing Simulation: {id}".format(id=self.sim_id)) - if self.results_socket: - self.results_socket.close() - if self.proc and self.send_sighup: - ppid = os.getppid() - if self.success: - log.warning("Sending SIGHUP") - os.kill(ppid, SIGHUP) - else: - log.warning("Sending SIGINT") - os.kill(ppid, SIGINT) - - def handle_exception(self, exc): - self.signal_exception(exc) - - def signal_exception(self, exc=None): - """ - All exceptions inside any component should boil back to - this handler. - - Will inform the system that the component has failed and how it - has failed. - """ - exc_type, exc_value, exc_traceback = sys.exc_info() - - try: - log.exception('{id} sending exception to result stream.'\ - .format(id=self.sim_id)) - msg = zp.EXCEPTION_FRAME( - exc_traceback, - exc_type.__name__, - exc_value.message - ) - - self.results_socket.send(msg) - except: - log.exception("Exception while reporting simulation exception.") - - def open(self): - if not self.context: - self.context = zmq.Context() - if self.results_uri: - sock = self.context.socket(zmq.PUSH) - sock.connect(self.results_uri) - self.results_socket = sock - self.setup_logging() - - def setup_logging(self): - assert self.results_socket - # The filter behavior is: matches are logged, mismatches - # are bubbled. If bubble is True, matches are also - # bubbled. Since we do not want user logs in our system - # logs, we set bubble to False. - self.zmq_out = ZeroMQLogHandler( - socket=self.results_socket, - filter=lambda r, h: r.channel in ['Print', 'AlgoLog'], - bubble=False - ) - - def join(self): - if self.proc: - self.proc.join() - - def get_pids(self): - if self.proc: - return [self.proc.pid] - else: - return [] + def next(self): + return self.gen.next() @staticmethod def create_test_zipline(**config): @@ -297,10 +179,6 @@ class SimulatedTrading(object): if not simulation_style: simulation_style = SIMULATION_STYLE.FIXED_SLIPPAGE - zmq_context = config.get('zmq_context', None) - simulation_id = config.get('simulation_id', 'test_simulation') - results_socket_uri = config.get('results_socket_uri', None) - #------------------- # Trade Source #------------------- @@ -341,52 +219,7 @@ class SimulatedTrading(object): test_algo, trading_environment, simulation_style, - results_socket_uri, - zmq_context, - simulation_id) + ) #------------------- return sim - - -class SimulatedTradingLite(object): - """ - SimulatedTrading without multiprocess and without zmq. - Useful for profiling the core logic and for rapid testing - of new features. - """ - def __init__(self, - sources, - transforms, - algorithm, - environment, - style): - """ - @sources - an iterable of iterables - These iterables must yield ndicts that contain: - - type :: a ziplines.protocol.DATASOURCE_TYPE - - dt :: a milliseconds since epoch timestamp in UTC - - @transforms - An iterable of instances of StatefulTransform. - - @algorithm - An object that implements: - `def initialize(self)` - `def handle_data(self, data)` - `def get_sid_filter(self)` - `def set_logger(self, logger)` - `def set_order(self, order_callable)` - - @environment - An instance of finance.trading.TradingEnvironment - - @style - protocol.SIMULATION_STYLE - """ - self.date_sorted = date_sorted_sources(*sources) - self.transforms = transforms - # Formerly merged_transforms. - self.with_tnfms = sequential_transforms(self.date_sorted, - *self.transforms) - self.trading_client = tsc(algorithm, environment, style) - self.gen = self.trading_client.simulate(self.with_tnfms) - - def get_results(self): - return self.gen diff --git a/zipline/protocol.py b/zipline/protocol.py index f9c1326b..a9bc0ef2 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -1,668 +1,4 @@ -""" -The messaging protocol for Zipline. - -Asserts are in place because any protocol error corresponds to a -programmer error so we want it to fail fast and in an obvious way -so it doesn't happen again. ZeroMQ follows the same philosophy. - -Notes -===== - -Msgpack -------- -Msgpack is the fastest serialization protocol in Python at the -moment. Its 100% C is typically orders of magnitude faster than -json and pickle making it awesome for ZeroMQ. - -You can only serialize Python structural primitives: strings, -numeric types, dicts, tuples and lists. Any any recursive -combinations of these. - -Basically every basestring in Python corresponds to valid -msgpack message since the protocol is highly error tolerant. -Just keep in mind that if you ever unpack a raw msgpack string -make sure it looks like what you intend and/or catch ValueError -and TypeError exceptions. - -It also has the nice benefit of never invoking ``eval`` ( unlike -json and pickle) which is a major security boon since it is -impossible to arbitrary code for evaluation through messages. - -UltraJSON ---------- -For anything going to the browser UltraJSON is the fastest -serializer, its mostly C as well. - -The same domain of serialization as msgpack applies: Python -structural primitives. It also has the additional constraint -that anything outside of UTF8 can cause serious problems, so if -you have a strong desire to JSON encode ancient Sanskrit -( admit it, we all do ), just say no. - -Data Structures -=============== - -Enum ----- - -Classic C style enumeration:: - - opts = Enum('FOO', 'BAR') - - opts.FOO # 0 - opts.BAR # 1 - opts.FOO = opts.BAR # False - -Oh, and if you do this:: - - protocol.Enum([1,2,3]) - -Your interpreter will segfault, think of this like an extreme assert. - -Namedict --------- - -Namedicts are dict like objects that have fields accessible by attribute lookup -as well as being indexable and iterable:: - - HEARTBEAT_PROTOCOL = ndict({ - 'REQ' : b'\x01', - 'REP' : b'\x02', - }) - - HEARTBEAT_PROTOCOL.REQ # syntactic sugar - HEARTBEAT_PROTOCOL.REP # oh suga suga - - HEARTBEAT_PROTOCOL['REQ'] # classic dictionary index - -Namedtuple ----------- - -From the standard library, namedtuples are great for specifying -containers for spec'ing data container objects:: - - from collections import namedtuple - - Person = namedtuple('Person', 'name age gender') - bob = Person(name='Bob', age=30, gender='male') - - bob.name # 'Bob' - bob.age # 30 - bob.gender # male - - # The slots on the tuple are also finite and read-only. This - # is a good thing, keeps us honest! - - bob.hobby = 'underwater archery' - # Will raise: - # AttributeError: 'Person' object has no attribute 'hobby' - - bob.name = 'joe' - # Will raise: - # AttributeError: can't set attribute - - # Namedtuples are normally read-only, but you can change the - # internals using a private operation. - bob._replace(gender='female') - - # You can also dump out to dictionary form: - OrderedDict([('name', 'Bob'), ('age', 30), ('gender', 'male')]) - - # Or JSON. - json.dumps(bob._asdict()) - '{"gender":"male","age":30,"name":"Bob"}' - -""" - -import msgpack -import numbers -import datetime -import pytz -import traceback -import re -import os - -from collections import namedtuple - -from utils.protocol_utils import Enum, FrameExceptionFactory, ndict, namelookup -from utils.date_utils import EPOCH, UN_EPOCH, epoch_now - -# ----------------------- -# Control Protocol -# ----------------------- - -PRODUCTION_PREFIXES = ['PERF', 'RISK', 'EXCEPTION','CANCEL','DONE', 'LOG'] -PRICE_FIELDS = ['price', 'open', 'close', 'high', 'low'] - -INVALID_CONTROL_FRAME = FrameExceptionFactory('CONTROL') - -CONTROL_STATES = Enum( - 'INIT', - 'SOURCES_READY', - 'RUNNING', - 'TERMINATE', -) - -CONTROL_PROTOCOL = Enum( - 'HEARTBEAT' , # 0 - req - 'SHUTDOWN' , # 1 - req - 'KILL' , # 2 - req - 'GO' , # - req - - 'OK' , # 3 - rep - 'DONE' , # 4 - rep - 'EXCEPTION' , # 5 - rep - 'READY' , # 6 - rep -) - -def CONTROL_FRAME(event, payload): - assert isinstance(event, int,) - assert isinstance(payload, basestring) - - return msgpack.dumps(tuple([event, payload])) - -def CONTROL_UNFRAME(msg): - """ - A status code and a message. - """ - assert isinstance(msg, basestring) - - try: - event, payload = msgpack.loads(msg) - assert isinstance(event, int) - assert isinstance(payload, basestring) - - return event, payload - except TypeError: - raise INVALID_CONTROL_FRAME(msg) - except ValueError: - raise INVALID_CONTROL_FRAME(msg) - -# ----------------------- -# Component State -# ----------------------- - -COMPONENT_TYPE = Enum( - 'SOURCE' , # 0 - 'CONDUIT' , # 1 - 'SINK' , # 2 -) - -COMPONENT_STATE = Enum( - 'OK' , # 0 - 'DONE' , # 1 - 'EXCEPTION' , # 2 -) - -# NOFAILURE - Component is either not running or has not failed -# ALGOEXCEPT - Exception thrown in the given algorithm -# HOSTEXCEPT - Exception thrown on our end. -# INTERRUPT - Manually interuptted by user - -COMPONENT_FAILURE = Enum( - 'NOFAILURE' , - 'ALGOEXCEPT' , - 'HOSTEXCEPT' , - 'INTERRUPT' , -) - -BACKTEST_STATE = Enum( - 'IDLE' , - 'QUEUED' , - 'INPROGRESS' , - 'CANCELLED' , # cancelled ( before natural completion ) - 'EXCEPTION' , # failure ( due to unnatural causes ) - 'DONE' , # done ( naturally completed ) -) - -# ----------------------- -# Datasource Protocol -# ----------------------- - -INVALID_DATASOURCE_FRAME = FrameExceptionFactory('DATASOURCE') - -def DATASOURCE_FRAME(event): - """ - Wraps any datasource payload with id and type, so that unpacking may choose - the write UNFRAME for the payload. - - :param event: ndict with following properties - - - *ds_id* an identifier that is unique to the datasource in the context of a component host (e.g. Simulator) - - *ds_type* a string denoting the datasource type. Must be on of: - - - TRADE - - DONE - - (others to follow soon) - - - *payload* a msgpack string carrying the payload for the frame - """ - assert isinstance(event.source_id, basestring) - assert isinstance(event.type, int), 'Unexpected type %s' % (event.type) - - #datasources will send sometimes send empty msgs to feel gaps - if (event.type == DATASOURCE_TYPE.EMPTY): - return msgpack.dumps(tuple([ - event.type, - event.source_id, - "EMPTY" - ])) - - elif(event.type == DATASOURCE_TYPE.TRADE): - return msgpack.dumps(tuple([ - event.type, - event.source_id, - TRADE_FRAME(event) - ])) - - elif(event.type == DATASOURCE_TYPE.DONE): - return msgpack.dumps(tuple([ - event.type, - event.source_id, - "DONE" - ])) - else: - raise INVALID_DATASOURCE_FRAME(str(event)) - -def DATASOURCE_UNFRAME(msg): - """ - - Extracts payload, and calls correct UNFRAME method based on the - datasource type passed along. - - Returns a dict containing at least: - - - source_id: instance-unique string - - type: datasource type - - dt: None, 'DONE' or a datetime object - - other properties are added based on the datasource type: - - - TRADE - - - sid - int security identifier - - price - float - - volume - int - - dt - a datetime object - - """ - try: - ds_type, source_id, payload = msgpack.loads(msg) - assert isinstance(ds_type, int) - - rval = ndict({'source_id':source_id}) - - if payload == DATASOURCE_TYPE.EMPTY: - child_value = ndict({'dt':None}) - elif(ds_type == DATASOURCE_TYPE.TRADE): - child_value = TRADE_UNFRAME(payload) - elif(ds_type == DATASOURCE_TYPE.DONE): - child_value = ndict({'dt' : 'DONE'}) - else: - raise INVALID_DATASOURCE_FRAME(msg) - - rval.merge(child_value) - return rval - - except TypeError: - raise INVALID_DATASOURCE_FRAME(msg) - except ValueError: - raise INVALID_DATASOURCE_FRAME(msg) - -# ----------------------- -# Feed Protocol -# ----------------------- - -INVALID_FEED_FRAME = FrameExceptionFactory('FEED') - -def FEED_FRAME(event): - """ - :param event: a ndict with at least - - - source_id - - type - - dt - """ - assert isinstance(event, ndict), 'unknown type %s' % str(event) - source_id = event.source_id - ds_type = event.type - PACK_DATE(event) - payload = event.as_dict() - return msgpack.dumps(payload) - -def FEED_UNFRAME(msg): - try: - payload = msgpack.loads(msg) - #TODO: anything we can do to assert more about the content of the dict? - assert isinstance(payload, dict) - rval = ndict(payload) - assert rval.source_id - assert rval.type in DATASOURCE_TYPE - assert rval.dt - UNPACK_DATE(rval) - return rval - except TypeError: - raise INVALID_FEED_FRAME(msg) - except ValueError: - raise INVALID_FEED_FRAME(msg) - -# ----------------------- -# Transform Protocol -# ----------------------- - -INVALID_TRANSFORM_FRAME = FrameExceptionFactory('TRANSFORM') - -def TRANSFORM_FRAME(name, value): - assert isinstance(name, basestring) - if value == None: - return msgpack.dumps(tuple([name, TRANSFORM_TYPE.EMPTY])) - return msgpack.dumps(tuple([name, value])) - -def TRANSFORM_UNFRAME(msg): - """ - :rtype: ndict with : - """ - try: - - name, value = msgpack.loads(msg) - if(value == TRANSFORM_TYPE.EMPTY): - return ndict({name : None}) - #TODO: anything we can do to assert more about the content of the dict? - assert isinstance(name, basestring) - if(name == TRANSFORM_TYPE.PASSTHROUGH): - value = FEED_UNFRAME(value) - - return ndict({name : value}) - except TypeError: - raise INVALID_TRANSFORM_FRAME(msg) - except ValueError: - raise INVALID_TRANSFORM_FRAME(msg) - -# ----------------------- -# Merge Protocol -# ----------------------- -INVALID_MERGE_FRAME = FrameExceptionFactory('MERGE') - -def MERGE_FRAME(event): - """ - :param event: a nameddict with at least: - - - source_id - - type - """ - assert isinstance(event, ndict) - PACK_DATE(event) - payload = event.as_dict() - return msgpack.dumps(payload) - -def MERGE_UNFRAME(msg): - try: - payload = msgpack.loads(msg) - #TODO: anything we can do to assert more about the content of the dict? - assert isinstance(payload, dict) - payload = ndict(payload) - UNPACK_DATE(payload) - return payload - except TypeError: - raise INVALID_MERGE_FRAME(msg) - except ValueError: - raise INVALID_MERGE_FRAME(msg) - - -# ----------------------- -# Trades -# ----------------------- -# -# - Should only be called from inside DATASOURCE_ (UN)FRAME. - -def TRADE_FRAME(event): - """ - :param event: should be a ndict with: - - - ds_id -- the datasource id sending this trade out - - sid -- the security id - - price -- float of the price printed for the trade - - volume -- int for shares in the trade - - dt -- datetime for the trade - - """ - assert isinstance(event, ndict) - assert event.type == DATASOURCE_TYPE.TRADE - assert isinstance(event.sid, int) - for field in PRICE_FIELDS: - assert isinstance(event[field], numbers.Real) - assert isinstance(event.volume, numbers.Integral) - PACK_DATE(event) - return msgpack.dumps(tuple([ - event.sid, - event.price, - event.open, - event.close, - event.high, - event.low, - event.volume, - event.dt, - event.type - ])) - -def TRADE_UNFRAME(msg): - try: - packed = msgpack.loads(msg) - sid, price, open, close, high, low, volume, dt, source_type = packed - - assert isinstance(sid, int) - assert isinstance(price, numbers.Real) - assert isinstance(volume, numbers.Integral) - rval = ndict({ - 'sid' : sid, - 'price' : price, - 'open' : open, - 'close' : close, - 'high' : high, - 'low' : low, - 'volume' : volume, - 'dt' : dt, - 'type' : source_type - }) - UNPACK_DATE(rval) - return rval - except TypeError: - raise INVALID_TRADE_FRAME(msg) - except ValueError: - raise INVALID_TRADE_FRAME(msg) - -# ----------------------- -# Performance and Risk -# ----------------------- - -def PERF_FRAME(perf): - """ - Frame the performance update created at the end of each simulated trading - day. The msgpack is a tuple with the first element statically set to 'PERF'. - Like RISK_FRAME, this method calls BT_UPDATE_FRAME internally, so that - clients can call BT_UPDATE_UNFRAME for all messages from the backtest. - - :param perf: the dictionary created by zipline.trade_client.perf - :rvalue: a msgpack string - """ - - #TODO: add asserts... - - assert isinstance(perf['started_at'], datetime.datetime) - assert isinstance(perf['period_start'], datetime.datetime) - assert isinstance(perf['period_end'], datetime.datetime) - - assert isinstance(perf['daily_perf'], dict) - assert isinstance(perf['cumulative_perf'], dict) - - tp = perf['daily_perf'] - cp = perf['cumulative_perf'] - - assert isinstance(tp['transactions'], list) - # we never want to send transactions for the cumulative period. - # performance.py should never send them, but just to be safe: - assert not cp.has_key('transactions') - assert isinstance(tp['positions'], list) - assert isinstance(cp['positions'], list) - assert isinstance(tp['period_close'], datetime.datetime) - assert isinstance(tp['period_open'], datetime.datetime) - assert isinstance(cp['period_close'], datetime.datetime) - assert isinstance(cp['period_open'], datetime.datetime) - - perf['started_at'] = EPOCH(perf['started_at']) - perf['period_start'] = EPOCH(perf['period_start']) - perf['period_end'] = EPOCH(perf['period_end']) - tp['period_close'] = EPOCH(tp['period_close']) - tp['period_open'] = EPOCH(tp['period_open']) - cp['period_close'] = EPOCH(cp['period_close']) - cp['period_open'] = EPOCH(cp['period_open']) - - tp['transactions'] = convert_transactions(tp['transactions']) - - return BT_UPDATE_FRAME('PERF', perf) - -def convert_transactions(transactions): - results = [] - for txn in transactions: - txn['date'] = EPOCH(txn['dt']) - del(txn['dt']) - results.append(txn) - return results - -def RISK_FRAME(risk): - return BT_UPDATE_FRAME('RISK', risk) - -def EXCEPTION_FRAME(exception_tb, name, message): - stack_list = traceback.extract_tb(exception_tb) - rlist = [] - for stack in stack_list: - filename = shorten_filename(stack[0]) - # default the line to empty string rather than None - line = '' - if stack[3]: - line = stack[3] - rstack = { - 'filename' : filename, - 'lineno' : stack[1], - 'method' : stack[2], - 'line' : line - } - rlist.append(rstack) - result = { - 'date' : epoch_now(), - 'stack' : rlist, - 'name' : name, - 'message' : message - } - - return BT_UPDATE_FRAME('EXCEPTION', result) - -def shorten_filename(filename): - if filename == None: - return None - - # check if the path contains zipeline - # looks for a zipline directory in the middle of the path - # this will work on - # /zipline/workspace/zipline/core/component.py, but fail for - # /home/fawce/projects/zipline/zipline/core/component.py - path_re = r'(.*)(/zipline/.*[.]py)' - match = re.search(path_re, filename) - - if match and match.lastindex == 2: - filepath = match.group(2) - return os.path.join('/zipline',filepath) - else: - # return just the filename. - head, tail = os.path.split(filename) - return tail - -def CANCEL_FRAME(date): - result = { - 'date' : EPOCH(date) - } - - return BT_UPDATE_FRAME('CANCEL', result) - -def DONE_FRAME(msg): - assert isinstance(msg, basestring), \ - "Done message must be a string." - - return BT_UPDATE_FRAME('DONE', msg) - - -def BT_UPDATE_FRAME(prefix, payload): - """ - Frames prepared by RISK_FRAME and PERF_FRAME methods are sent via the same - socket. This method provides a prefix to allow for muxing the messages - onto a single socket. - """ - return msgpack.dumps(tuple([prefix, payload])) - -def BT_UPDATE_UNFRAME(msg): - """ - Risk, Perf, and LOG framing methods prefix the payload with - a shorthand for their type. That way, all messages received from the socket - can be PERF_FRAMED(), whether they are risk, perf, or log. - """ - prefix, payload = msgpack.loads(msg, use_list=True) - return dict(prefix=prefix, payload=payload) - -# ----------------------- -# Date Helpers -# ----------------------- - -def PACK_DATE(event): - """ - Packs the datetime property of event into msgpack'able longs. - This function should be called purely for its side effects. - The event's 'dt' property is replaced by a tuple of integers - - - year, month, day, hour, minute, second, microsecond - - PACK_DATE and UNPACK_DATE are inverse operations. - - :param event: event must a ndict with a property named 'dt' that is a datetime. - :rtype: None - """ - assert isinstance(event.dt, datetime.datetime) - # utc only please - assert event.dt.tzinfo == pytz.utc - event['dt'] = date_to_tuple(event['dt']) - -def date_to_tuple(dt): - year, month, day, hour, minute, second = dt.timetuple()[0:6] - micros = dt.microsecond - return tuple([year, month, day, hour, minute, second, micros]) - -def UNPACK_DATE(event): - """ - Unpacks the datetime property of event from msgpack'able longs. - This function should be called purely for its side effects. - The event's 'dt' property is converted to a datetime by reading and then - combining a tuple of integers. - - UNPACK_DATE and PACK_DATE are inverse operations. - - :param tuple event: event must a ndict with: - - - a property named 'dt_tuple' that is a tuple of integers \ - representing the date and time in UTC. - - dt_tuple must have year, month, day, hour, minute, second, and microsecond - - :rtype: None - """ - assert isinstance(event.dt, tuple) - assert len(event.dt) == 7 - for item in event.dt: - assert isinstance(item, numbers.Integral) - event.dt = tuple_to_date(event.dt) - -def tuple_to_date(date_tuple): - year, month, day, hour, minute, second, micros = date_tuple - dt = datetime.datetime(year, month, day, hour, minute, second) - dt = dt.replace(microsecond = micros, tzinfo = pytz.utc) - return dt +from utils.protocol_utils import Enum, ndict, namelookup # Datasource type should completely determine the other fields of a # message with its type. @@ -698,58 +34,3 @@ SIMULATION_STYLE = Enum( 'FIXED_SLIPPAGE', 'NOOP' ) - -#Global variables for the fields we extract out of a standard logbook record. -LOG_FIELDS = set(['func_name', 'lineno', 'time', 'msg',\ - 'level', 'channel', ]) -LOG_EXTRA_FIELDS = set(['algo_dt',]) -LOG_DONE = "DONE" - -def LOG_FRAME(payload): - """ - Expects a dictionary of the form: - { - 'algo_dt' : 1199223000, #Algo simulation date. - 'time' : 1199223001, #Realtime date of log creation. - 'func_name' : 'foo', - 'lineno' : 46, - 'msg' : 'Successfully disintegrated llama #3', - 'level' : 4, #Logbook enum - 'channel' : 'MyLogger' - } - - Frame checks that we have all expected fields and exports an - event/payload dict as JSON. - """ - - assert isinstance(payload, dict), \ - "LOG_FRAME expected a dict" - - assert payload.has_key('algo_dt'), \ - "LOG_FRAME with no algo_dt" - assert payload.has_key('time'), \ - "LOG_FRAME with no time" - assert payload.has_key('channel'),\ - "LOG_FRAME with no channel" - assert payload.has_key('level'),\ - "LOG_FRAME with no level" - assert payload.has_key('msg'),\ - "LOG_FRAME with no message" - - # truncation will only work with strings and msgpack will - # preserve primitives. - payload['msg'] = str(payload['msg']) - - return BT_UPDATE_FRAME('LOG', payload) - -def LOG_UNFRAME(msg): - """ - msg should be a tuple of ('LOG',dict) - """ - record = msgpack.loads(msg) - assert isinstance(record, tuple) - assert len(record) == 2 - assert record[0] == 'LOG' - payload = record[1] - - return payload diff --git a/zipline/test_algorithms.py b/zipline/test_algorithms.py index eb883469..62c78ffb 100644 --- a/zipline/test_algorithms.py +++ b/zipline/test_algorithms.py @@ -229,7 +229,8 @@ class InitializeTimeoutAlgorithm(): def initialize(self): import time from zipline.gens.tradesimulation import INIT_TIMEOUT - time.sleep(INIT_TIMEOUT + 1) + time.sleep(INIT_TIMEOUT + 1000) + def set_order(self, order_callable): pass @@ -245,7 +246,7 @@ class InitializeTimeoutAlgorithm(): def get_sid_filter(self): return [self.sid] - + class TooMuchProcessingAlgorithm(): def __init__(self, sid): self.sid = sid @@ -270,7 +271,7 @@ class TooMuchProcessingAlgorithm(): def get_sid_filter(self): return [self.sid] - + class TimeoutAlgorithm(): def __init__(self, sid): diff --git a/zipline/utils/date_utils.py b/zipline/utils/date_utils.py index 2819d4e9..8e2aeaaa 100644 --- a/zipline/utils/date_utils.py +++ b/zipline/utils/date_utils.py @@ -3,7 +3,6 @@ from collections import namedtuple import time import pytz import iso8601 -import calendar from dateutil import rrule from datetime import datetime, date, timedelta from dateutil.relativedelta import * @@ -141,3 +140,9 @@ def date_to_datetime(t): dt = datetime.fromordinal(t.toordinal()) dt = dt.replace(tzinfo = pytz.utc) return dt + +def tuple_to_date(date_tuple): + year, month, day, hour, minute, second, micros = date_tuple + dt = datetime(year, month, day, hour, minute, second) + dt = dt.replace(microsecond = micros, tzinfo = pytz.utc) + return dt diff --git a/zipline/utils/factory.py b/zipline/utils/factory.py index cf2168fb..7146c0de 100644 --- a/zipline/utils/factory.py +++ b/zipline/utils/factory.py @@ -9,8 +9,10 @@ from os.path import join, abspath, dirname from operator import attrgetter from datetime import datetime, timedelta +from zipline.utils.date_utils import tuple_to_date +from zipline.utils.protocol_utils import ndict + import zipline.finance.risk as risk -import zipline.protocol as zp from zipline.gens.tradegens import RandomEquityTrades from zipline.gens.tradegens import SpecificEquityTrades @@ -33,7 +35,7 @@ def load_market_data(): bm_list = msgpack.loads(fp_bm.read()) bm_returns = [] for packed_date, returns in bm_list: - event_dt = zp.tuple_to_date(packed_date) + event_dt = tuple_to_date(packed_date) #event_dt = event_dt.replace( # hour=0, # minute=0, @@ -49,7 +51,7 @@ def load_market_data(): tr_list = msgpack.loads(fp_tr.read()) tr_curves = {} for packed_date, curve in tr_list: - tr_dt = zp.tuple_to_date(packed_date) + tr_dt = tuple_to_date(packed_date) #tr_dt = tr_dt.replace(hour=0, minute=0, second=0, tzinfo=pytz.utc) tr_curves[tr_dt] = curve @@ -95,7 +97,7 @@ def create_trade_history(sid, prices, amounts, interval, trading_calendar): return trades def create_txn(sid, price, amount, datetime, btrid=None): - txn = zp.ndict({ + txn = ndict({ 'sid' : sid, 'amount' : amount, 'dt' : datetime, diff --git a/zipline/utils/log_utils.py b/zipline/utils/log_utils.py index ea1abf18..c7d7b9b9 100644 --- a/zipline/utils/log_utils.py +++ b/zipline/utils/log_utils.py @@ -1,20 +1,10 @@ import logbook -import zmq -import pytz -import datetime - -from logbook import NOTSET -from logbook.handlers import Handler, FileHandler - -from zipline.protocol import LOG_FRAME, LOG_FIELDS, \ - LOG_EXTRA_FIELDS - from contextlib import contextmanager log = logbook.Logger("LogUtils") -class redirecter(object): +class redirector(object): def __init__(self, logger, name): self.logger = logger self.buffer = bytes() @@ -33,7 +23,7 @@ class redirecter(object): self.logger.error(out_form) self.buffer = bytes() -class log_redirecter(object): +class log_redirector(object): def __init__(self, logger): self.logger = logger @@ -57,8 +47,8 @@ def stdout_pipe(logger, pipe_name): import sys orig_fds = sys.stdout, sys.stderr - sys.stderr = redirecter(logger, pipe_name) - sys.stdout = redirecter(logger, pipe_name) + sys.stderr = redirector(logger, pipe_name) + sys.stdout = redirector(logger, pipe_name) yield sys.stderr.flush() @@ -72,95 +62,8 @@ def stdout_only_pipe(logger, pipe_name): """ import sys orig_fd = sys.stdout - sys.stdout = log_redirecter(logger) + sys.stdout = log_redirector(logger) yield sys.stdout.flush() sys.stdout = orig_fd - -class ZeroMQLogHandler(Handler): - """ - A handler that takes messages captured from the user algorithm stdout - and transforms them into LOG_FRAMES suitable for database storage. - Setup is similar to logbook.queues.ZeroMQHandler, except we connect - instead of binding and we extract record fields into a dict. - """ - - def __init__(self, socket=None, level=NOTSET, filter=None, bubble=False, - context=None, fds = LOG_FIELDS, extra_fds = LOG_EXTRA_FIELDS): - Handler.__init__(self, level, filter, bubble) - - try: - import zmq - except ImportError: - raise RuntimeError('The pyzmq library is required for ' - 'the ZeroMQHandler.') - #: the zero mq context - self.context = context - #: the zero mq socket. - self.socket = socket #self.context.socket(zmq.PUSH) - - #self.uri = uri - #if uri is not None: - # self.socket.connect(uri) - - self.fds = fds - self.extra_fds = extra_fds - - def export_record(self, record): - """ - Extract relevant fields from a log record, fiddling with datetime - fields to make json happy. - """ - from zipline.utils.date_utils import EPOCH - - #Needed to extract record info from dictionary. - record.pull_information() - - #Logbook stores record times as datetime objects, which - #can't be serialized by JSON, so we need to convert to - #unix epoch representation. - - - #Do the same if algo_dt is a datetime object. - if record.extra.has_key('algo_dt'): - algo_dt = record.extra['algo_dt'] - - if isinstance(algo_dt, datetime.datetime): - algo_dt = EPOCH(algo_dt.replace(tzinfo = pytz.utc)) - record.extra['algo_dt'] = algo_dt - - data = {} - - #Extract all the fields we care about from LogRecord's internal - #dictionary. - - for field in iter(self.fds): - if record.__dict__.has_key(field): - data[field] = record.__dict__[field] - else: - data[field] = None - - for field in iter(self.extra_fds): - if record.extra.has_key(field): - data[field] = record.extra[field] - else: - data[field] = None - - if data['time']: - assert isinstance(data['time'], datetime.datetime) - - time = data['time'].replace(tzinfo = pytz.utc) - #logbook measures time in utc already, no need to convert. - data['time'] = EPOCH(time) - - return data - - def emit(self, record): - """Extract relevant fields and send info as JSON over a zmq socket.""" - payload = self.export_record(record) - self.socket.send(LOG_FRAME(payload)) - - def close(self): - pass - #self.socket.close() diff --git a/zipline/utils/protocol_utils.py b/zipline/utils/protocol_utils.py index c3ae9352..62f3aab9 100644 --- a/zipline/utils/protocol_utils.py +++ b/zipline/utils/protocol_utils.py @@ -5,7 +5,7 @@ from collections import MutableMapping def Enum(*options): """ - Fast enums are very important when we want really tight zmq + Fast enums are very important when we want really tight loops. These are probably going to evolve into pure C structs anyways so might as well get going on that. """ diff --git a/zipline/utils/test_utils.py b/zipline/utils/test_utils.py index 513352b1..9ec7326c 100644 --- a/zipline/utils/test_utils.py +++ b/zipline/utils/test_utils.py @@ -1,7 +1,4 @@ import multiprocessing -import zmq -import time -import zipline.protocol as zp from datetime import datetime import blist from zipline.utils.date_utils import EPOCH @@ -65,69 +62,24 @@ def check(test, a, b, label=None): test.assertEqual(a, b, "mismatch on path: " + label) -def drain_zipline(test, zipline, p_blocking=False): - assert test.ctx, "method expects a valid zmq context" - assert test.zipline_test_config, "method expects a valid test config" - assert isinstance(test.zipline_test_config, dict) - assert test.zipline_test_config['results_socket_uri'], \ - "need to specify a socket address for logs/perf/risk" - test.receiver = create_receiver( - test.zipline_test_config['results_socket_uri'], - test.ctx - ) - # Bind and connect are asynch, so allow time for bind before - # starting the zipline (TSC connects internally). - time.sleep(1) - - # start the simulation - zipline.simulate(blocking=p_blocking) - output, transaction_count = drain_receiver(test.receiver) - # some processes will exit after the message stream is - # finished. We block here to avoid collisions with subsequent - # ziplines. - zipline.join() - - return output, transaction_count - - -def create_receiver(socket_addr, ctx): - receiver = ctx.socket(zmq.PULL) - receiver.bind(socket_addr) - - return receiver - - -def drain_receiver(receiver, count=None): +def drain_zipline(test, zipline): output = [] transaction_count = 0 msg_counter = 0 - while True: - msg = receiver.recv() + # start the simulation + for update in zipline: msg_counter += 1 - update = zp.BT_UPDATE_UNFRAME(msg) output.append(update) - if update['prefix'] == 'PERF': + if 'daily_perf' in update: transaction_count += \ - len(update['payload']['daily_perf']['transactions']) - elif update['prefix'] == 'EXCEPTION': - break - elif update['prefix'] == 'DONE': - break - - if count and msg_counter >= count: - break - - receiver.close() - del receiver + len(update['daily_perf']['transactions']) return output, transaction_count -def assert_single_position(test, zipline, blocking=False): - output, transaction_count = drain_zipline(test, - zipline, - p_blocking=blocking) - test.assertEqual(output[-1]['prefix'], 'DONE') +def assert_single_position(test, zipline): + + output, transaction_count = drain_zipline(test, zipline) test.assertEqual( test.zipline_test_config['order_count'], @@ -137,8 +89,7 @@ def assert_single_position(test, zipline, blocking=False): # the final message is the risk report, the second to # last is the final day's results. Positions is a list of # dicts. - perfs = [x for x in output if x['prefix'] == 'PERF'] - closing_positions = perfs[-2]['payload']['daily_perf']['positions'] + closing_positions = output[-2]['daily_perf']['positions'] test.assertEqual( len(closing_positions), @@ -154,18 +105,6 @@ def assert_single_position(test, zipline, blocking=False): ) -def launch_component(component): - proc = multiprocessing.Process(target=component.run) - proc.start() - return proc - - -def launch_monitor(monitor): - proc = multiprocessing.Process(target=monitor.run) - proc.start() - return proc - - class ExceptionSource(object): def __init__(self):