eliminated special handling for non-blocking components.

This commit is contained in:
fawce
2012-04-16 12:06:39 -04:00
parent 02d7f0a4c8
commit bd616a9ec0
3 changed files with 5 additions and 52 deletions
-7
View File
@@ -503,13 +503,6 @@ class Component(object):
"""
raise NotImplementedError
@property
def is_blocking(self):
"""
True if a zipline be held open for this component.
"""
return False
@property
def get_pure(self):
+2 -23
View File
@@ -212,15 +212,6 @@ class OrderDataSource(qmsg.DataSource):
def get_type(self):
return zp.DATASOURCE_TYPE.ORDER
#
@property
def is_blocking(self):
"""
This datasource is in a loop with the TradingSimulationClient,
so we don't want it to block processing.
"""
return True
def open(self):
qmsg.DataSource.open(self)
self.order_socket = self.bind_order()
@@ -243,7 +234,8 @@ class OrderDataSource(qmsg.DataSource):
# to potentially receive the client's done message before the
# controller or heartbeat times out.
# TODO: shouldn't this block until we receive a message?
# this will block for timeout/2, and return an empty dict if there
# are no messages.
socks = dict(self.poll.poll(self.heartbeat_timeout/2))
# see if the poller has results for the result_feed
@@ -294,19 +286,6 @@ class TransactionSimulator(qmsg.BaseTransform):
elif style == SIMULATION_STYLE.NOOP:
self.apply_trade_to_open_orders = self.simulate_noop
#
@property
def is_blocking(self):
"""
Including this explicitly for clarity, even though we are using the
default value. TransactionSimulator has a defined action for every
event type. Downstream components depend on the presence of the
TRANSACTION transform in all cases. When no transaction happens,
None is the value. Thus, we do want merging to block on the
availability of transaction messages.
"""
return True
def transform(self, event):
"""
Pulls one message from the event feed, then
+3 -22
View File
@@ -83,13 +83,9 @@ class ComponentHost(Component):
self.sync_register[component.get_id] = datetime.datetime.utcnow()
if isinstance(component, DataSource):
self.feed.add_source(component.get_id, component.is_blocking)
if not component.is_blocking:
self.feed.ds_finished_counter +=1
self.feed.add_source(component.get_id)
if isinstance(component, BaseTransform):
self.merge.add_source(component.get_id, component.is_blocking)
if not component.is_blocking:
self.feed.ds_finished_counter +=1
self.merge.add_source(component.get_id)
def unregister_component(self, component_id):
del self.components[component_id]
@@ -199,9 +195,6 @@ class Feed(Component):
self.sent_counters = Counter()
self.recv_counters = Counter()
# source_id -> boolean. True is for blocking
self.is_blocking_map = {}
def init(self):
pass
@@ -350,9 +343,6 @@ class Feed(Component):
all un-DONE, blocking sources.
"""
for source_id, events in self.data_buffer.iteritems():
if not self.is_blocking_map[source_id]:
continue
if len(events) == 0:
return False
return True
@@ -367,12 +357,11 @@ class Feed(Component):
total += len(events)
return total
def add_source(self, source_id, is_blocking=True):
def add_source(self, source_id):
"""
Add a data source to the buffer.
"""
self.data_buffer[source_id] = []
self.is_blocking_map[source_id] = is_blocking
def __len__(self):
"""
@@ -478,10 +467,6 @@ class BaseTransform(Component):
def get_type(self):
return COMPONENT_TYPE.CONDUIT
@property
def is_blocking(self):
return True
def open(self):
"""
Establishes zmq connections.
@@ -630,10 +615,6 @@ class DataSource(Component):
@property
def get_id(self):
return self.id
@property
def is_blocking(self):
return True
@property
def get_type(self):