From cf31f08af12f548bbd2b93c586f2b54b3f3424eb Mon Sep 17 00:00:00 2001 From: fawce Date: Sat, 11 Feb 2012 23:20:45 -0500 Subject: [PATCH] created simulator, re-wrote tests. tests are failing, but without errors --- qsim/config.py | 54 ----- qsim/core.py | 439 +++++++++++++++++++++++++---------- qsim/messaging.py | 30 +-- qsim/sources.py | 25 +- qsim/test/client.py | 16 +- qsim/test/test_messaging.py | 89 +++---- qsim/transforms/core.py | 232 ------------------ qsim/transforms/technical.py | 36 ++- 8 files changed, 392 insertions(+), 529 deletions(-) delete mode 100644 qsim/config.py delete mode 100644 qsim/transforms/core.py diff --git a/qsim/config.py b/qsim/config.py deleted file mode 100644 index fa2b9a9f..00000000 --- a/qsim/config.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Tools for managing configuration data for sources and transforms.""" - -import json - -class Config(object): - """ Name/Value configuration object with type-safe accessors and json serialization/deserialization.""" - - def __init__(self, props): - self.store = props - - def __setitem__(self, key, value): - self.store[key] = value - - def __getitem__(self, key): - if self.store.has_key(key): - return self.store[key] - - def __getattr__(self, attrname): - if self.store.has_key(attrname): - return self.store[attrname] - else: - raise AttributeError("No attribute named {name}".format(name=attrname)) - - def get_integer(self, name, default=0): - """get the named config property as an integer""" - return self.get_value(name, default, type(1)) - - def get_string(self, name, default=''): - """get the named config property as a string""" - return self.get_value(name, default, type('')) - - def get_float(self, name, default=0.0): - """get the named config property as a float""" - return self.get_value(name, default, type(1.0)) - - def get_value(self, name, default, expected_type): - """ - return the named config property as the expected_type. - if the property is missing, or is not of the right type, return default. - """ - if(self.store.has_key(name)): - val = self.store[name] - if isinstance(val, expected_type): - return val - else: - return default - - def to_json(self): - """convert this config to a json string""" - return json.dumps(self.store) - - def from_json(self, json_string): - """parse a json string into this config object's properties""" - self.store = json.loads(json_string) \ No newline at end of file diff --git a/qsim/core.py b/qsim/core.py index 50924da9..39665d32 100644 --- a/qsim/core.py +++ b/qsim/core.py @@ -1,174 +1,369 @@ """ Provides simulated data feed services. """ - -import qsim.sources as sources -import qsim.util as qutil -import qsim.messaging as qmsg -import qsim.transforms.technical as ta - +import multiprocessing import zmq import time import logging import json +import copy + +import qsim.sources as sources +import qsim.util as qutil +import qsim.messaging as qmsg class Simulator(object): """ - Simulator translates configuration data into running source, feed, transform, and merge components. + Simulator coordinates the launch and communication of source, feed, transform, and merge components. """ - def __init__(self, config): + def __init__(self, sources, transforms, client): """ - :config: a qsim.config.Config object that contains configuration information for all datasources, all transforms, and all - client algorithms that simulator should create. """ - self.config = config - self.data_workers = {} - + self.sources = sources + self.transforms = transforms + self.client = client + self.sync_register = {} + self.sync_address = "tcp://127.0.0.1:{port}".format(port=10100) + self.data_address = "tcp://127.0.0.1:{port}".format(port=10101) + self.feed_address = "tcp://127.0.0.1:{port}".format(port=10102) + self.merge_address = "tcp://127.0.0.1:{port}".format(port=10103) + self.result_address = "tcp://127.0.0.1:{port}".format(port=10104) def launch(self): - """ - Create all components specified in config... - """ - self.feed = DataFeed(self.config.sources.keys()) - self.start_data_sources(self.config.sources) - self.create_transforms(self.config.transforms) - - def start_data_sources(self, configs): - """ - :configs: array of dicts with properties - """ - for name, info in configs.iteritems(): - if(info['class'] == "EquityMinuteTrades"): - emt = EquityMinuteTrades(info['sid'], self.feed, name) - self.data_workers[name] = emt - elif(info['class'] == "RandomEquityTrades"): - ret = sources.RandomEquityTrades(info['sid'], self.feed, name, info['count']) - self.data_workers[name] = ret - - qutil.LOGGER.info("starting {id}".format(id=source_id)) - self.data_workers[name].start() - + self.feed = DataFeed(self.sources.keys(), self.data_address, self.feed_address, qmsg.Sync(self,"DataFeed")) + self.launch_component("DataFeed", self.feed) + for name, data_source in self.sources.iteritems(): + data_source.data_address = self.data_address + data_source.sync = qmsg.Sync(self, str(data_source.source_id)) + self.launch_component(name, data_source) qutil.LOGGER.info("datasources processes launched") - - def start_transforms(self, configs): - """ - :configs: Must be an array of dicts holding properties needed for each transform. See the classes in :py:module:`qsim.transforms` - Create transforms based on configs, set each transform's result address to - transforms_address. Each transform will connect to transforms_address that all transformed events will be PUSH'd - to this object. - """ - self.transforms = {} - for props in configs: - class_name = props['class'] - if(class_name == 'MovingAverage'): - mavg = ta.MovingAverage(self.feed, props, self.transform_address) - self.transforms[mavg.config.name] = mavg - - keys = copy.copy(self.transforms.keys()) - keys.append("feed") #for the raw feed - self.data_buffer = qmsg.MergedParallelBuffer(keys) - - self.buffers = {} + + #connect all the transforms to the feed and merge for name, transform in self.transforms.iteritems(): - self.buffers[name] = [] - qutil.LOGGER.info("starting {name}".format(name=name)) - proc = multiprocessing.Process(target=transform.run) - proc.start() - - -class DataFeed(object): + transform.feed_address = self.feed_address #connect transform to receive feed. + transform.merge_address = self.merge_address #connect transform to push results to merge + transform.sync = qmsg.Sync(self,name) #synchronize the transform against this simulation. + self.launch_component(name, transform) #start transforms + + #connect merge to feed, set expected transforms + self.merge = TransformsMerge(self.feed_address, + self.merge_address, + self.result_address, + qmsg.Sync(self,"TransformsMerge"), + self.transforms.keys()) + + self.launch_component("transforms merge", self.merge) + qutil.LOGGER.info("transform processes launched") + + #connect client to merged feed + self.client.address = self.result_address + self.client.sync = qmsg.Sync(self,"Client") + client_proc = self.launch_component("client", self.client) + qutil.LOGGER.info("client process launched") + + self.sync_components() + client_proc.join() #wait for client to complete processing + + def launch_component(self, name, component): + qutil.LOGGER.info("starting {name}".format(name=name)) + proc = multiprocessing.Process(target=component.run) + proc.start() + return proc - def __init__(self, source_list): - """ - :source_list: list of source IDs - """ - - self.data_address = "tcp://127.0.0.1:{port}".format(port=10101) - self.sync_address = "tcp://127.0.0.1:{port}".format(port=10102) - self.feed_address = "tcp://127.0.0.1:{port}".format(port=10103) - - self.client_register = {} - - self.data_buffer = qmsg.ParallelBuffer(source_id_list) - def register_sync(self, sync_id): - self.client_register[sync_id] = "UNCONFIRMED" - + self.sync_register[sync_id] = "UNCONFIRMED" + def registration_complete(self): - for sync_id, status in self.client_register.iteritems(): + for sync_id, status in self.sync_register.iteritems(): if status == "UNCONFIRMED": return False - + return True - - def sync_clients(self): + + def sync_components(self): # Socket to receive signals + self.context = zmq.Context() qutil.LOGGER.info("waiting for all datasources and clients to be ready") self.syncservice = self.context.socket(zmq.REP) self.syncservice.bind(self.sync_address) - + while not self.registration_complete(): # wait for synchronization request msg = self.syncservice.recv() - self.client_register[msg] = "CONFIRMED" + self.sync_register[msg] = "CONFIRMED" #qutil.LOGGER.info("confirmed {id}".format(id=msg)) # send synchronization reply self.syncservice.send('CONFIRMED') - + self.syncservice.close() qutil.LOGGER.info("sync'd all datasources and clients") - + + + +class DataFeed(object): + + def __init__(self, source_list, data_address, feed_address, sync): + """ + :source_list: list of data source IDs + """ + self.feed_address = feed_address + self.data_address = data_address + self.data_buffer = qmsg.ParallelBuffer(source_list) + self.sync = sync + def run(self): # Prepare our context and sockets - self.context = zmq.Context() + try: + self.context = zmq.Context() - ds_finished_counter = 0 + ds_finished_counter = 0 - #create the data sink. Based on http://zguide.zeromq.org/py:tasksink2 - #see: http://zguide.zeromq.org/py:taskwork2 - self.data_socket = self.context.socket(zmq.PULL) - self.data_socket.bind(self.data_address) + #create the data sink. Based on http://zguide.zeromq.org/py:tasksink2 + #see: http://zguide.zeromq.org/py:taskwork2 + self.data_socket = self.context.socket(zmq.PULL) + self.data_socket.bind(self.data_address) - #create the feed - self.feed_socket = self.context.socket(zmq.PUB) - self.feed_socket.bind(self.feed_address) + #create the feed + self.feed_socket = self.context.socket(zmq.PUB) + self.feed_socket.bind(self.feed_address) - self.data_buffer.out_socket = self.feed_socket + self.data_buffer.out_socket = self.feed_socket - #wait for all feed subscribers - self.sync_clients() + self.sync.confirm() + qutil.LOGGER.info("entering feed loop on {addr}".format(addr=self.data_address)) - qutil.LOGGER.info("entering feed loop on {addr}".format(addr=self.data_address)) - - while True: - message = self.data_socket.recv() - event = json.loads(message) - if(event["type"] == "DONE"): - ds_finished_counter += 1 - if(len(self.data_workers) == ds_finished_counter): - break - else: - self.data_buffer.append(event[u's'], event) - self.data_buffer.send_next() + while True: + message = self.data_socket.recv() + event = json.loads(message) + if(event["type"] == "DONE"): + ds_finished_counter += 1 + if(len(self.data_buffer) == ds_finished_counter): + break + else: + self.data_buffer.append(event[u's'], event) + self.data_buffer.send_next() + #drain any remaining messages in the buffer + self.data_buffer.drain() + + #send the DONE message + self.feed_socket.send("DONE") + qutil.LOGGER.info("received {n} messages, sent {m} messages".format(n=self.data_buffer.received_count, + m=self.data_buffer.sent_count)) + except: + qutil.LOGGER.exception("Exception in Feed, attempting to close.") + finally: + self.data_socket.close() + self.feed_socket.close() + self.context.term() + + + +class BaseTransform(object): + """Parent class for feed transforms. Subclass and override transform + method to create a new derived value from the combined feed.""" + + def __init__(self, name): + """ + """ + + self.feed_address = None + self.merge_address = None + self.state = {} + self.state['name'] = name + self.sync = None + self.received_count = 0 + self.sent_count = 0 + self.context = None + + def run(self): + """Top level execution entry point for the transform:: + + - connects to the feed socket to subscribe to events + - connets to the result socket (most oftened bound by a TransformsMerge) to PUSH transforms + - processes all messages received from feed, until DONE message received + - pushes all transforms + - sends DONE to result socket, closes all sockets and context""" + try: + self.open() + self.process_all() + except: + qutil.LOGGER.exception("Exception during merge processing, attempting to close merge.") + finally: + self.close() + + def open(self): + """ + Establishes zmq connections. + """ + self.context = zmq.Context() + + qutil.LOGGER.info("starting {name} transform". + format(name = self.state['name'])) + #create the feed SUB. + self.feed_socket = self.context.socket(zmq.SUB) + self.feed_socket.connect(self.feed_address) + self.feed_socket.setsockopt(zmq.SUBSCRIBE,'') + + #create the result PUSH + self.result_socket = self.context.socket(zmq.PUSH) + self.result_socket.connect(self.merge_address) + + def process_all(self): + """ + Loops until feed's DONE message is received: + - receive an event from the data feed + - call transform (subclass' method) on event + - send the transformed event + """ + qutil.LOGGER.info("starting {name} event loop".format(name = self.state['name'])) + self.sync.confirm() + + while True: + message = self.feed_socket.recv() + if(message == "DONE"): + qutil.LOGGER.info("{name} received the Done message from the feed".format(name=self.state['name'])) + self.result_socket.send("DONE") + break; + self.received_count += 1 + event = json.loads(message) + cur_state = self.transform(event) + cur_state['dt'] = event['dt'] + cur_state['name'] = self.state['name'] + self.result_socket.send(json.dumps(cur_state)) + self.sent_count += 1 + + def close(self): + """ + Shut down zmq resources. + """ + qutil.LOGGER.info("Transform {name} recieved {r} and sent {s}".format(name=self.state['name'], r=self.received_count, s=self.sent_count)) + + self.feed_socket.close() + self.result_socket.close() + self.context.term() + + def transform(self, event): + """ Must return the transformed value as a map with {name:"name of new transform", value: "value of new field"} + Transforms run in parallel and results are merged into a single map, so transform names must be unique. + Best practice is to use the self.state object initialized from the transform configuration, and only set the + transformed value: + self.state['value'] = transformed_value + """ + return {} + +class TransformsMerge(object): + """ Merge data feed and array of transform feeds into a single result vector. + PULL from feed + PULL from child transforms + PUSH merged message to client + + """ + + def __init__(self, feed_address, transform_address, result_address, sync, transform_list): + """ + """ + self.sync = sync + self.feed_address = feed_address + self.transform_address = transform_address + self.result_address = result_address + buffer_list = copy.copy(transform_list) + buffer_list.append("feed") #for the raw feed + self.data_buffer = qmsg.MergedParallelBuffer(buffer_list) + + def run(self): + """""" + try: + self.open() + self.process_all() + except: + qutil.LOGGER.exception("Exception during merge processing, attempting to close merge.") + finally: + self.close() + + def open(self): + """Establish zmq context, feed socket, result socket for client, and transform + socket to receive transformed events. Create and launch transforms. Will confirm + ready with the DataFeed at the conclusion.""" + self.context = zmq.Context() + + qutil.LOGGER.info("starting transforms merge") + #create the feed SUB. + self.feed_socket = self.context.socket(zmq.SUB) + self.feed_socket.connect(self.feed_address) + self.feed_socket.setsockopt(zmq.SUBSCRIBE,'') + + #create the result PUSH + self.result_socket = self.context.socket(zmq.PUSH) + self.result_socket.bind(self.result_address) + + #create the transform PULL. + self.transform_socket = self.context.socket(zmq.PULL) + self.transform_socket.bind(self.transform_address) + self.data_buffer.out_socket = self.result_socket + + # Initialize poll set + self.poller = zmq.Poller() + self.poller.register(self.feed_socket, zmq.POLLIN) + self.poller.register(self.transform_socket, zmq.POLLIN) + + self.sync.confirm() + + def close(self): + """ + Close all zmq sockets and context. + """ + self.transform_socket.close() + self.feed_socket.close() + self.result_socket.close() + self.context.term() + + def process_all(self): + """ + Uses a Poller to receive messages from all transforms and the feed. + All transforms corresponding to the same event are merged with each other + and the original feed event into a single message. That message is then + sent to the result socket. + """ + done_count = 0 + while True: + socks = dict(self.poller.poll()) + + if self.feed_socket in socks and socks[self.feed_socket] == zmq.POLLIN: + message = self.feed_socket.recv() + if(message == "DONE"): + qutil.LOGGER.info("finished receiving feed to merge") + done_count += 1 + else: + event = json.loads(message) + self.data_buffer.append("feed",event) + + if self.transform_socket in socks and socks[self.transform_socket] == zmq.POLLIN: + t_message = self.transform_socket.recv() + if(t_message == "DONE"): + qutil.LOGGER.info("finished receiving a transform to merge") + done_count += 1 + else: + t_event = json.loads(t_message) + self.data_buffer.append(t_event['name'], t_event) + + if(done_count >= len(self.data_buffer)): + break #done! + + self.data_buffer.send_next() + + qutil.LOGGER.info("about to drain {n} messages from merger's buffer".format(n=self.data_buffer.pending_messages())) + #drain any remaining messages in the buffer self.data_buffer.drain() - - #send the DONE message - self.feed_socket.send("DONE") - qutil.LOGGER.info("received {n} messages, sent {m} messages".format(n=self.data_buffer.received_count, - m=self.data_buffer.sent_count)) - self.data_socket.close() - self.feed_socket.close() - self.context.term() - - + + #signal to client that we're done + self.result_socket.send("DONE") + + - - - diff --git a/qsim/messaging.py b/qsim/messaging.py index 4e47d1b2..aff0668c 100644 --- a/qsim/messaging.py +++ b/qsim/messaging.py @@ -101,29 +101,29 @@ class MergedParallelBuffer(ParallelBuffer): return result -class FeedSync(object): - """FeedSync instances register themselves with a DataFeed. Once the FeedSync - is created, the DataFeed is guaranteed to block until confirm is called on this - instance (and all others registered with the feed). Components can use instances - to delay the start of the feed until initial setup is complete.""" +class Sync(object): + """Sync instances register themselves with a Host. Once the Sync + is created, the Host is guaranteed to block until confirm is called on this + instance (and all others registered with the host). Components can use instances + to delay the start of the host until initial setup is complete.""" - def __init__(self, feed, name): - self.feed = feed + def __init__(self, host, name): + self.host = host self.sync_id = "{name}-{id}".format(name=name, id=uuid.uuid1()) - self.feed.register_sync(self.sync_id) - #qutil.LOGGER.info("registered {id} with feed".format(id=self.sync_id)) + self.host.register_sync(self.sync_id) + #qutil.LOGGER.info("registered {id} with host".format(id=self.sync_id)) def confirm(self): - """Confirm readiness with the DataFeed.""" + """Confirm readiness with the Host.""" context = zmq.Context() - #synchronize with feed + #synchronize with host sync_socket = context.socket(zmq.REQ) - sync_socket.connect(self.feed.sync_address) - # send a synchronization request to the feed + sync_socket.connect(self.host.sync_address) + # send a synchronization request to the host sync_socket.send(self.sync_id) - # wait for synchronization reply from the feed + # wait for synchronization reply from the host sync_socket.recv() sync_socket.close() context.term() - qutil.LOGGER.info("sync'd feed from {id}".format(id = self.sync_id)) + qutil.LOGGER.info("sync'd host from {id}".format(id = self.sync_id)) \ No newline at end of file diff --git a/qsim/sources.py b/qsim/sources.py index 90c2b63c..8c90be9f 100644 --- a/qsim/sources.py +++ b/qsim/sources.py @@ -16,34 +16,27 @@ class DataSource(object): means looping through all records in a store, converting to a dict, and calling send(map). """ - def __init__(self, feed, source_id): + def __init__(self, source_id): self.source_id = source_id - self.feed = feed + self.data_address = None + self.sync = None self.cur_event = None self.context = None self.data_socket = None - - def start(self): - """Launch the datasource in a separate process.""" - proc = multiprocessing.Process(target=self.run) - proc.start() - - + def open(self): """create zmq context and socket""" qutil.LOGGER.info( "starting data source:{source_id} on {addr}" - .format(source_id=self.source_id, addr=self.feed.data_address)) + .format(source_id=self.source_id, addr=self.data_address)) self.context = zmq.Context() #create the data sink. Based on http://zguide.zeromq.org/py:tasksink2 self.data_socket = self.context.socket(zmq.PUSH) - self.data_socket.connect(self.feed.data_address) + self.data_socket.connect(self.data_address) - #signal we are ready - sync = qmsg.FeedSync(self.feed, str(self.source_id)) - sync.confirm() + self.sync.confirm() def run(self): """Fully execute this datasource.""" @@ -80,8 +73,8 @@ class DataSource(object): class RandomEquityTrades(DataSource): """Generates a random stream of trades for testing.""" - def __init__(self, sid, feed, source_id, count): - DataSource.__init__(self, feed, source_id) + def __init__(self, sid, source_id, count): + DataSource.__init__(self, source_id) self.count = count self.sid = sid diff --git a/qsim/test/client.py b/qsim/test/client.py index ec5ca471..a23b3f1f 100644 --- a/qsim/test/client.py +++ b/qsim/test/client.py @@ -9,11 +9,9 @@ import qsim.messaging as qmsg class TestClient(object): - def __init__(self,feed, address, bind=False): - self.feed = feed - self.address = address - self.sync = qmsg.FeedSync(feed, "testclient") - self.bind = bind + def __init__(self): + self.address = None + self.sync = None self.received_count = 0 def run(self): @@ -23,12 +21,8 @@ class TestClient(object): self.data_feed = self.context.socket(zmq.PULL) - if(self.bind): - qutil.LOGGER.info("binding to {address}".format(address=self.address)) - self.data_feed.bind(self.address) - else: - qutil.LOGGER.info("connecting to {address}".format(address=self.address)) - self.data_feed.connect(self.address) + qutil.LOGGER.info("connecting to {address}".format(address=self.address)) + self.data_feed.connect(self.address) self.sync.confirm() diff --git a/qsim/test/test_messaging.py b/qsim/test/test_messaging.py index d5f33bfa..3653d983 100644 --- a/qsim/test/test_messaging.py +++ b/qsim/test/test_messaging.py @@ -6,9 +6,9 @@ Test suite for the messaging infrastructure of QSim. import unittest2 as unittest import multiprocessing -from qsim.core import DataFeed -from qsim.transforms.core import MergedTransformsFeed +from qsim.core import Simulator from qsim.transforms.technical import MovingAverage +from qsim.sources import RandomEquityTrades import qsim.util as qutil from qsim.test.client import TestClient @@ -20,71 +20,44 @@ class MessagingTestCase(unittest.TestCase): def setUp(self): """generate some config objects for the datafeed, sources, and transforms.""" - qutil.configure_logging() - qutil.LOGGER.info("testing...") - self.total_data_count = 800 - self.feed_config = {'emt1':{'sid':133, 'class':'RandomEquityTrades', 'count':400}, - 'emt2':{'sid':134, 'class':'RandomEquityTrades', 'count':400}} - self.feed = DataFeed(self.feed_config) - self.feed_proc = multiprocessing.Process(target=self.feed.run) - - self.config = {} - self.config['name'] = '**merged feed**' - self.config['transforms'] = [{'name':'mavg1', 'class':'MovingAverage', 'hours':1}, - {'name':'mavg2', 'class':'MovingAverage', 'hours':2}] + qutil.configure_logging() - def test_client(self): - """directly connect the test client to the feed, using two random data sources""" - - #subscribe a client to the multiplexed feed - client = TestClient(self.feed, self.feed.feed_address) - - feed_proc = multiprocessing.Process(target=self.feed.run) - feed_proc.start() + def test_sources_only(self): + """streams events from two data sources, no transforms.""" + + ret1 = RandomEquityTrades(133, "ret1", 400) + ret2 = RandomEquityTrades(134, "ret2", 400) + sources = {"ret1":ret1, "ret2":ret2} + client = TestClient() + sim = Simulator(sources, {}, client) + sim.launch() - client.run() - self.assertEqual(self.feed.data_buffer.pending_messages(), 0, + self.assertEqual(sim.feed.data_buffer.pending_messages(), 0, "The feed should be drained of all messages, found {n} remaining." - .format(n=self.feed.data_buffer.pending_messages())) - self.assertEqual(self.total_data_count, client.received_count, + .format(n=sim.feed.data_buffer.pending_messages())) + self.assertEqual(800, client.received_count, "The client should have received ({n}) the same number of messages as the feed sent ({m})." - .format(n=client.received_count, m=self.total_data_count)) + .format(n=client.received_count, m=800)) - def dtest_moving_average_to_client(self): - """2 datasources -> feed -> moving average transform -> testclient - verify message count at client.""" - - mavg = MovingAverage(self.feed, self.config['transforms'][0], result_address="tcp://127.0.0.1:20202") - mavg_proc = multiprocessing.Process(target=mavg.run) - mavg_proc.start() - - client = TestClient(self.feed, mavg.result_address, bind=True) - - feed_proc = multiprocessing.Process(target=self.feed.run) - feed_proc.start() - - client.run() - self.assertEqual(self.feed.data_buffer.pending_messages(), 0, "The feed should be drained of all messages.") - self.assertEqual(self.total_data_count, client.received_count, - "The client should have received the same number of messages as the feed sent.") - - def dtest_merged_to_client(self): + def test_merged_to_client(self): """ 2 datasources -> feed -> 2 moving average transforms -> transform merge -> testclient verify message count at client. """ - merger = MergedTransformsFeed(self.feed, self.config) - merger_proc = multiprocessing.Process(target=merger.run) - merger_proc.start() - - client = TestClient(self.feed, merger.result_address) - - feed_proc = multiprocessing.Process(target=self.feed.run) - feed_proc.start() - - client.run() - self.assertEqual(self.feed.data_buffer.pending_messages(), 0, "The feed should be drained of all messages.") - self.assertEqual(self.total_data_count, client.received_count, + + ret1 = RandomEquityTrades(133, "ret1", 400) + ret2 = RandomEquityTrades(134, "ret2", 400) + sources = {"ret1":ret1, "ret2":ret2} + mavg1 = MovingAverage("mavg1", 30) + mavg2 = MovingAverage("mavg2", 60) + transforms = {"mavg1":mavg1, "mavg2":mavg2} + client = TestClient() + sim = Simulator(sources, {}, client) + sim.launch() + + + self.assertEqual(sim.feed.data_buffer.pending_messages(), 0, "The feed should be drained of all messages.") + self.assertEqual(800, client.received_count, "The client should have received the same number of messages as the feed sent.") diff --git a/qsim/transforms/core.py b/qsim/transforms/core.py deleted file mode 100644 index 79464466..00000000 --- a/qsim/transforms/core.py +++ /dev/null @@ -1,232 +0,0 @@ -""" -""" -import zmq -import json -import copy -import multiprocessing -import zmq - -import qsim.util as qutil -import qsim.messaging as qmsg -import qsim.config as config - -class BaseTransform(object): - """Parent class for feed transforms. Subclass and override transform - method to create a new derived value from the combined feed.""" - - def __init__(self, feed, config_dict, result_address): - """ - :feed_address: zmq socket address, Transform will CONNECT a PULL socket and receive messages until "DONE" is received. - :result_address: zmq socket address, Transform will CONNECT a PUSH socket and send messaes until feed_socket receives "DONE" - :sync_address: zmq socket address, Transform will CONNECT a REQ socket and send/receive one message before entering feed loop - :config: must be a dict that can be wrapped in a config.Config object with at least an entry for 'name':string value - :server: if True, transform will bind to the result address (and act as a server), if False it will connect. The - the last transform in a series should be server=True so that clients can connect. - """ - - self.feed = feed - self.result_address = result_address - self.config = config.Config(config_dict) - self.state = {} - self.state['name'] = self.config.name - self.sync = qmsg.FeedSync(feed, self.state['name']) - self.received_count = 0 - self.sent_count = 0 - self.context = None - - def run(self): - """Top level execution entry point for the transform:: - - - connects to the feed socket to subscribe to events - - connets to the result socket (most oftened bound by a TransformsMerge) to PUSH transforms - - processes all messages received from feed, until DONE message received - - pushes all transforms - - sends DONE to result socket, closes all sockets and context""" - self.open() - self.process_all() - self.close() - - def open(self): - """ - Establishes zmq connections. - """ - self.context = zmq.Context() - - qutil.LOGGER.info("starting {name} transform". - format(name = self.state['name'])) - #create the feed SUB. - self.feed_socket = self.context.socket(zmq.SUB) - self.feed_socket.connect(self.feed.feed_address) - self.feed_socket.setsockopt(zmq.SUBSCRIBE,'') - - #create the result PUSH - self.result_socket = self.context.socket(zmq.PUSH) - self.result_socket.connect(self.result_address) - - def process_all(self): - """ - Loops until feed's DONE message is received: - - receive an event from the data feed - - call transform (subclass' method) on event - - send the transformed event - """ - qutil.LOGGER.info("starting {name} event loop".format(name = self.state['name'])) - self.sync.confirm() - - while True: - message = self.feed_socket.recv() - if(message == "DONE"): - qutil.LOGGER.info("{name} received the Done message from the feed".format(name=self.state['name'])) - self.result_socket.send("DONE") - break; - self.received_count += 1 - event = json.loads(message) - cur_state = self.transform(event) - cur_state['dt'] = event['dt'] - cur_state['name'] = self.state['name'] - self.result_socket.send(json.dumps(cur_state)) - self.sent_count += 1 - - def close(self): - """ - Shut down zmq resources. - """ - qutil.LOGGER.info("Transform {name} recieved {r} and sent {s}".format(name=self.state['name'], r=self.received_count, s=self.sent_count)) - - self.feed_socket.close() - self.result_socket.close() - self.context.term() - - def transform(self, event): - """ Must return the transformed value as a map with {name:"name of new transform", value: "value of new field"} - Transforms run in parallel and results are merged into a single map, so transform names must be unique. - Best practice is to use the self.state object initialized from the transform configuration, and only set the - transformed value: - self.state['value'] = transformed_value - """ - return {} - -class MergedTransformsFeed(BaseTransform): - """ Merge data feed and array of transform feeds into a single result vector. - PULL from feed - PULL from child transforms - PUSH merged message to client - - """ - - def __init__(self, feed, props): - """ - :props: - must have an entry for 'transforms':array of dicts, which are - converted to configs. - """ - BaseTransform.__init__(self, feed, props, "tcp://127.0.0.1:20202") - self.transform_address = "tcp://127.0.0.1:{port}".format(port=10104) - self.transform_socket = None - self.create_transforms(self.config.transforms) - - - def create_transforms(self, configs): - """ - :configs: an array of config objects with a class property. Each type of transform needs - Create transforms based on configs, set each transform's result address to - this object's transform_address, so that all transformed events will be delivered - to this object. - """ - self.transforms = {} - for props in configs: - class_name = props['class'] - if(class_name == 'MovingAverage'): - mavg = ta.MovingAverage(self.feed, props, self.transform_address) - self.transforms[mavg.config.name] = mavg - - keys = copy.copy(self.transforms.keys()) - keys.append("feed") #for the raw feed - self.data_buffer = qmsg.MergedParallelBuffer(keys) - - self.buffers = {} - for name, transform in self.transforms.iteritems(): - self.buffers[name] = [] - - def open(self): - """Establish zmq context, feed socket, result socket for client, and transform - socket to receive transformed events. Create and launch transforms. Will confirm - ready with the DataFeed at the conclusion.""" - self.context = zmq.Context() - - qutil.LOGGER.info("starting {name} transform".format(name = self.state['name'])) - #create the feed SUB. - self.feed_socket = self.context.socket(zmq.SUB) - self.feed_socket.connect(self.feed.feed_address) - self.feed_socket.setsockopt(zmq.SUBSCRIBE,'') - - #create the result PUSH - self.result_socket = self.context.socket(zmq.PUSH) - self.result_socket.bind(self.result_address) - - #create the transform PULL. - self.transform_socket = self.context.socket(zmq.PULL) - self.transform_socket.bind(self.transform_address) - self.data_buffer.out_socket = self.result_socket - - # Initialize poll set - self.poller = zmq.Poller() - self.poller.register(self.feed_socket, zmq.POLLIN) - self.poller.register(self.transform_socket, zmq.POLLIN) - - self.sync.confirm() - - def close(self): - """ - Close all zmq sockets and context. - """ - self.transform_socket.close() - BaseTransform.close(self) - - def process_all(self): - """ - Uses a Poller to receive messages from all transforms and the feed. - All transforms corresponding to the same event are merged with each other - and the original feed event into a single message. That message is then - sent to the result socket. - """ - done_count = 0 - while True: - socks = dict(self.poller.poll()) - - if self.feed_socket in socks and socks[self.feed_socket] == zmq.POLLIN: - message = self.feed_socket.recv() - if(message == "DONE"): - qutil.LOGGER.info("finished receiving feed to merge") - done_count += 1 - else: - self.received_count += 1 - event = json.loads(message) - self.data_buffer.append("feed",event) - - if self.transform_socket in socks and socks[self.transform_socket] == zmq.POLLIN: - t_message = self.transform_socket.recv() - if(t_message == "DONE"): - qutil.LOGGER.info("finished receiving a transform to merge") - done_count += 1 - else: - self.received_count += 1 - t_event = json.loads(t_message) - self.data_buffer.append(t_event['name'], t_event) - - if(done_count >= len(self.data_buffer)): - break #done! - - self.data_buffer.send_next() - - qutil.LOGGER.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count)) - qutil.LOGGER.info("about to drain {n} messages from merger's buffer".format(n=self.data_buffer.pending_messages())) - - #drain any remaining messages in the buffer - self.data_buffer.drain() - - #signal to client that we're done - self.result_socket.send("DONE") - qutil.LOGGER.info("Transform {name} received {r} and sent {s}".format(name=self.state['name'], r=self.data_buffer.received_count, s=self.data_buffer.sent_count)) - - - \ No newline at end of file diff --git a/qsim/transforms/technical.py b/qsim/transforms/technical.py index 43ff8477..72464e47 100644 --- a/qsim/transforms/technical.py +++ b/qsim/transforms/technical.py @@ -7,25 +7,19 @@ TODO: add trailing stop import datetime import qsim.util as qutil -from qsim.transforms.core import BaseTransform +from qsim.core import BaseTransform class MovingAverage(BaseTransform): """ Calculate a unweighted moving average for props['sid'] security - TODO: add sid filter. + TODO: add sid -> mvavg dict. """ - def __init__(self, feed, props, result_address): - BaseTransform.__init__(self, feed, props, result_address) + def __init__(self, name, days): + BaseTransform.__init__(self, name) self.events = [] - self.window = datetime.timedelta(days = self.config.get_integer('days'), - seconds = self.config.get_integer('seconds'), - microseconds = self.config.get_integer('microseconds'), - milliseconds = self.config.get_integer('milliseconds'), - minutes = self.config.get_integer('minutes'), - hours = self.config.get_integer('hours'), - weeks = self.config.get_integer('weeks')) + self.window = datetime.timedelta(days = days) @@ -33,21 +27,21 @@ class MovingAverage(BaseTransform): def transform(self, event): """Update the moving average with the latest data point.""" - self.events.append(event) + #self.events.append(event) #filter the event list to the window length. - self.events = [x for x in self.events if (qutil.parse_date(x['dt']) - qutil.parse_date(event['dt'])) <= self.window] + #self.events = [x for x in self.events if (qutil.parse_date(x['dt']) - qutil.parse_date(event['dt'])) <= self.window] - if(len(self.events) == 0): - return 0.0 + #if(len(self.events) == 0): + # return 0.0 - total = 0.0 - for event in self.events: - total += event['price'] + #total = 0.0 + #for event in self.events: + # total += event['price'] - self.average = total/len(self.events) - - self.state['value'] = self.average + #self.average = total/len(self.events) + #self.state['value'] = self.average + self.stat['value'] = 10 return self.state \ No newline at end of file