mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-30 17:05:35 +08:00
started delinting messaging module
This commit is contained in:
+14
-1
@@ -1,3 +1,6 @@
|
||||
"""
|
||||
Commonly used messaging components.
|
||||
"""
|
||||
import json
|
||||
import uuid
|
||||
import zmq
|
||||
@@ -5,7 +8,8 @@ import zmq
|
||||
import util as qutil
|
||||
|
||||
class ParallelBuffer(object):
|
||||
""" holds several queues of events by key, allows retrieval in date order or by merging"""
|
||||
""" holds several queues of events by key, allows retrieval in date order
|
||||
or by merging"""
|
||||
def __init__(self, key_list):
|
||||
self.out_socket = None
|
||||
self.sent_count = 0
|
||||
@@ -66,6 +70,9 @@ class ParallelBuffer(object):
|
||||
|
||||
|
||||
class MergedParallelBuffer(ParallelBuffer):
|
||||
"""
|
||||
Merges multiple streams of events into single messages.
|
||||
"""
|
||||
|
||||
def __init__(self, keys):
|
||||
ParallelBuffer.__init__(self, keys)
|
||||
@@ -73,6 +80,7 @@ class MergedParallelBuffer(ParallelBuffer):
|
||||
self.data_buffer["feed"] = self.feed
|
||||
|
||||
def next(self):
|
||||
"""Get the next merged message from the feed buffer."""
|
||||
if(not(self.is_full() or self.draining)):
|
||||
return
|
||||
|
||||
@@ -87,6 +95,10 @@ class MergedParallelBuffer(ParallelBuffer):
|
||||
|
||||
|
||||
class FeedSync(object):
|
||||
"""FeedSync instances register themselves with a DataFeed. Once the FeedSync
|
||||
is created, the DataFeed is guaranteed to block until confirm is called on this
|
||||
instance (and all others registered with the feed). Components can use instances
|
||||
to delay the start of the feed until initial setup is complete."""
|
||||
|
||||
def __init__(self, feed, name):
|
||||
self.feed = feed
|
||||
@@ -95,6 +107,7 @@ class FeedSync(object):
|
||||
#qutil.logger.info("registered {id} with feed".format(id=self.id))
|
||||
|
||||
def confirm(self):
|
||||
"""Confirm readiness with the DataFeed."""
|
||||
context = zmq.Context()
|
||||
#synchronize with feed
|
||||
sync_socket = context.socket(zmq.REQ)
|
||||
|
||||
@@ -46,7 +46,7 @@ DATA_FEED_PORT = 30000
|
||||
class Backtest(object):
|
||||
|
||||
def __init__(self, db, logger):
|
||||
self.logger = logger
|
||||
qutil.logger = logger
|
||||
self.db = db
|
||||
self.feed = DataFeed(db, logger)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user