diff --git a/zipline/component.py b/zipline/component.py index 7bcd8500..e4af443c 100644 --- a/zipline/component.py +++ b/zipline/component.py @@ -503,13 +503,6 @@ class Component(object): """ raise NotImplementedError - - @property - def is_blocking(self): - """ - True if a zipline be held open for this component. - """ - return False @property def get_pure(self): diff --git a/zipline/finance/trading.py b/zipline/finance/trading.py index cab1051a..97d8cebb 100644 --- a/zipline/finance/trading.py +++ b/zipline/finance/trading.py @@ -212,15 +212,6 @@ class OrderDataSource(qmsg.DataSource): def get_type(self): return zp.DATASOURCE_TYPE.ORDER - # - @property - def is_blocking(self): - """ - This datasource is in a loop with the TradingSimulationClient, - so we don't want it to block processing. - """ - return True - def open(self): qmsg.DataSource.open(self) self.order_socket = self.bind_order() @@ -243,7 +234,8 @@ class OrderDataSource(qmsg.DataSource): # to potentially receive the client's done message before the # controller or heartbeat times out. - # TODO: shouldn't this block until we receive a message? + # this will block for timeout/2, and return an empty dict if there + # are no messages. socks = dict(self.poll.poll(self.heartbeat_timeout/2)) # see if the poller has results for the result_feed @@ -294,19 +286,6 @@ class TransactionSimulator(qmsg.BaseTransform): elif style == SIMULATION_STYLE.NOOP: self.apply_trade_to_open_orders = self.simulate_noop - # - @property - def is_blocking(self): - """ - Including this explicitly for clarity, even though we are using the - default value. TransactionSimulator has a defined action for every - event type. Downstream components depend on the presence of the - TRANSACTION transform in all cases. When no transaction happens, - None is the value. Thus, we do want merging to block on the - availability of transaction messages. - """ - return True - def transform(self, event): """ Pulls one message from the event feed, then diff --git a/zipline/messaging.py b/zipline/messaging.py index b36dea95..03764807 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -83,13 +83,9 @@ class ComponentHost(Component): self.sync_register[component.get_id] = datetime.datetime.utcnow() if isinstance(component, DataSource): - self.feed.add_source(component.get_id, component.is_blocking) - if not component.is_blocking: - self.feed.ds_finished_counter +=1 + self.feed.add_source(component.get_id) if isinstance(component, BaseTransform): - self.merge.add_source(component.get_id, component.is_blocking) - if not component.is_blocking: - self.feed.ds_finished_counter +=1 + self.merge.add_source(component.get_id) def unregister_component(self, component_id): del self.components[component_id] @@ -199,9 +195,6 @@ class Feed(Component): self.sent_counters = Counter() self.recv_counters = Counter() - # source_id -> boolean. True is for blocking - self.is_blocking_map = {} - def init(self): pass @@ -350,9 +343,6 @@ class Feed(Component): all un-DONE, blocking sources. """ for source_id, events in self.data_buffer.iteritems(): - if not self.is_blocking_map[source_id]: - continue - if len(events) == 0: return False return True @@ -367,12 +357,11 @@ class Feed(Component): total += len(events) return total - def add_source(self, source_id, is_blocking=True): + def add_source(self, source_id): """ Add a data source to the buffer. """ self.data_buffer[source_id] = [] - self.is_blocking_map[source_id] = is_blocking def __len__(self): """ @@ -478,10 +467,6 @@ class BaseTransform(Component): def get_type(self): return COMPONENT_TYPE.CONDUIT - @property - def is_blocking(self): - return True - def open(self): """ Establishes zmq connections. @@ -630,10 +615,6 @@ class DataSource(Component): @property def get_id(self): return self.id - - @property - def is_blocking(self): - return True @property def get_type(self):