From a2c4babd93cd0b4a025fd64bb06454f3f0e99e99 Mon Sep 17 00:00:00 2001 From: fawce Date: Fri, 16 Mar 2012 17:54:34 -0400 Subject: [PATCH 1/3] hooking perf to the on_done callback. --- zipline/finance/trading.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index 6c3d1318..e6119a8e 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -29,6 +29,8 @@ class TradeSimulationClient(qmsg.Component): ) self.perf = perf.PerformanceTracker(self.trading_environment) + self.on_done = self.perf.handle_simulation_end + @property def get_id(self): From 3af850948cd760f769adb3fbf7800a12767916d2 Mon Sep 17 00:00:00 2001 From: fawce Date: Fri, 16 Mar 2012 17:56:53 -0400 Subject: [PATCH 2/3] fixed documentation on event callback method. --- zipline/finance/trading.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index e6119a8e..a7d0e65b 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -39,7 +39,9 @@ class TradeSimulationClient(qmsg.Component): def add_event_callback(self, callback): """ :param callable callback: must be a function with the signature - f(frame). + f(event), where event is a namedict whose properties depend on the + upstream configuration of the zipline. It will include datasource and + transformations. """ self.event_callbacks.append(callback) From f581be37237050f51893b55c4cf449741c8c3ea0 Mon Sep 17 00:00:00 2001 From: Stephen Diehl Date: Fri, 16 Mar 2012 18:14:23 -0400 Subject: [PATCH 3/3] Axe test_messaging. --- zipline/test/test_messaging.py | 272 --------------------------------- 1 file changed, 272 deletions(-) delete mode 100644 zipline/test/test_messaging.py diff --git a/zipline/test/test_messaging.py b/zipline/test/test_messaging.py deleted file mode 100644 index c901f984..00000000 --- a/zipline/test/test_messaging.py +++ /dev/null @@ -1,272 +0,0 @@ -""" -Test suite for the messaging infrastructure of QSim. -""" -#don't worry about excessive public methods pylint: disable=R0904 - -from collections import defaultdict - -from zipline.transforms.technical import MovingAverage -from zipline.sources import RandomEquityTrades - -from zipline.test.client import TestClient -from zipline.test.transform import DivideByZeroTransform - -from nose.tools import timed - -# Should not inherit form TestCase since test runners will pick -# it up as a test. Its a Mixin of sorts at this point. -class SimulatorTestCase(object): - - # Leased sockets is a defaultdict keyed by the test case. - # This lets you debug the sockets being allocated in the - # specific test cases and tear them down appropriately. - # - # { - # 'test_orders' : ['tcp : //127.0.0.1 : 1000', ... ], - # 'test_performance' : ['tcp : //127.0.0.1 : 1025', ... ], - # } - - leased_sockets = defaultdict(list) - - def setUp(self): - self.setup_logging() - - # TODO: how to make Nose use this cross-process???? - self.setup_allocator() - - def tearDown(self): - pass - #self.unallocate_sockets() - - # Assert the sockets were properly cleaned up - #self.assertEmpty(self.leased_sockets[self.id()].values()) - - # Assert they were returned to the heap - #self.allocator.socketheap.assert - - def get_simulator(self): - """ - Return a new simulator instance to be tested. - """ - raise NotImplementedError - - def get_controller(self): - """ - Return a new controler for simulator instance to be tested. - """ - raise NotImplementedError - - def setup_allocator(self): - """ - Setup the socket allocator for this test case. - """ - raise NotImplementedError - - def allocate_sockets(self, n): - """ - Allocate sockets local to this test case, track them so - we can gc after test run. - """ - - assert isinstance(n, int) - assert n > 0 - - leased = self.allocator.lease(n) - - self.leased_sockets[self.id()].extend(leased) - return leased - - def unallocate_sockets(self): - self.allocator.reaquire(*self.leased_sockets[self.id()]) - - # ------- - # Cases - # ------- - - @timed(2) - def test_simple(self): - - # Simple test just to make sure that the archiecture is - # responding. - - # Base Simuation - # -------------- - - # Allocate sockets for the simulator components - sockets = self.allocate_sockets(5) - - addresses = { - 'sync_address' : sockets[0], - 'data_address' : sockets[1], - 'feed_address' : sockets[2], - 'merge_address' : sockets[3], - 'result_address' : sockets[4] - } - - sim = self.get_simulator(addresses) - con = self.get_controller() - - # Simulation Components - # --------------------- - - ret1 = RandomEquityTrades(133, "ret1", 1) - ret2 = RandomEquityTrades(134, "ret2", 1) - client = TestClient() - - sim.register_controller( con ) - sim.register_components([ret1, ret2, client]) - - # Simulation - # ---------- - sim_context = sim.simulate() - sim_context.join() - - # Stop Running - # ------------ - - self.assertTrue(sim.ready()) - self.assertFalse(sim.exception) - - self.assertEqual(sim.feed.pending_messages(), 0, - "The feed should be drained of all messages, found {n} remaining." - .format(n=sim.feed.pending_messages()) - ) - - def test_simplefail(self): - - # Simple test just to make sure that the archiecture is - # responding. - - # Base Simuation - # -------------- - - # Allocate sockets for the simulator components - sockets = self.allocate_sockets(5) - - addresses = { - 'sync_address' : sockets[0], - 'data_address' : sockets[1], - 'feed_address' : sockets[2], - 'merge_address' : sockets[3], - 'result_address' : sockets[4] - } - - sim = self.get_simulator(addresses) - con = self.get_controller() - - # Simulation Components - # --------------------- - - ret1 = RandomEquityTrades(133, "ret1", 1) - ret2 = RandomEquityTrades(134, "ret2", 1) - fail_transform = DivideByZeroTransform("fail") - client = TestClient() - - sim.register_controller( con ) - sim.register_components([ret1, ret2, fail_transform, client]) - - # Simulation - # ---------- - sim_context = sim.simulate() - sim_context.join() - - # Stop Running - # ------------ - - self.assertTrue(fail_transform.exception) - self.assertFalse(fail_transform.successful()) - - self.assertEqual(sim.feed.pending_messages(), 0, - "The feed should be drained of all messages, found {n} remaining." - .format(n=sim.feed.pending_messages()) - ) - - def test_sources_only(self): - - # Base Simuation - # -------------- - - # Allocate sockets for the simulator components - sockets = self.allocate_sockets(5) - - addresses = { - 'sync_address' : sockets[0], - 'data_address' : sockets[1], - 'feed_address' : sockets[2], - 'merge_address' : sockets[3], - 'result_address' : sockets[4] - } - - sim = self.get_simulator(addresses) - con = self.get_controller() - - # Simulation Components - # --------------------- - - ret1 = RandomEquityTrades(133, "ret1", 400) - ret2 = RandomEquityTrades(134, "ret2", 400) - client = TestClient() - - sim.register_controller( con ) - sim.register_components([ret1, ret2, client]) - - # Simulation - # ---------- - sim_context = sim.simulate() - sim_context.join() - - # Stop Running - # ------------ - self.assertTrue(sim.ready()) - self.assertFalse(sim.exception) - - self.assertEqual(sim.feed.pending_messages(), 0, - "The feed should be drained of all messages, found {n} remaining." - .format(n=sim.feed.pending_messages()) - ) - - def test_transforms(self): - - # Base Simuation - # -------------- - - # Allocate sockets for the simulator components - sockets = self.allocate_sockets(5) - - addresses = { - 'sync_address' : sockets[0], - 'data_address' : sockets[1], - 'feed_address' : sockets[2], - 'merge_address' : sockets[3], - 'result_address' : sockets[4] - } - - sim = self.get_simulator(addresses) - con = self.get_controller() - - # Simulation Components - # --------------------- - - ret1 = RandomEquityTrades(133, "ret1", 5000) - ret2 = RandomEquityTrades(134, "ret2", 5000) - mavg1 = MovingAverage("mavg1", 30) - mavg2 = MovingAverage("mavg2", 60) - client = TestClient() - - sim.register_components([ret1, ret2, mavg1, mavg2, client]) - sim.register_controller( con ) - - # Simulation - # ---------- - sim_context = sim.simulate() - sim_context.join() - - # Stop Running - # ------------ - self.assertTrue(sim.ready()) - self.assertFalse(sim.exception) - - self.assertEqual(sim.feed.pending_messages(), 0, - "The feed should be drained of all messages, found {n} remaining." - .format(n=sim.feed.pending_messages()) - )