mirror of
https://github.com/wassname/catalyst.git
synced 2026-06-29 19:30:52 +08:00
Backport removal of sync_ack
This commit is contained in:
@@ -40,11 +40,6 @@ class Component(object):
|
||||
:param addresses: a dict of name_string -> zmq port address strings.
|
||||
Must have the following entries
|
||||
|
||||
:param sync_address: socket address used for synchronizing the start of
|
||||
all workers, heartbeating, and exit notification
|
||||
will be used in REP/REQ sockets. Bind is always on
|
||||
the REP side.
|
||||
|
||||
:param data_address: socket address used for data sources to stream
|
||||
their records. Will be used in PUSH/PULL sockets
|
||||
between data sources and a Feed. Bind will always
|
||||
@@ -430,7 +425,6 @@ class Component(object):
|
||||
)
|
||||
self.control_out.send(done_frame)
|
||||
|
||||
self.receive_sync_ack()
|
||||
#notify internal work look that we're done
|
||||
self.done = True # TODO: use state flag
|
||||
|
||||
@@ -450,19 +444,6 @@ class Component(object):
|
||||
# ZeroMQ. Either zmq.Poller or gpoll.Poller .
|
||||
self.poll = self.zmq_poller()
|
||||
|
||||
def receive_sync_ack(self):
|
||||
"""
|
||||
Wait for synchronization reply from the host.
|
||||
|
||||
DEPRECATED, left in for compatability for now.
|
||||
"""
|
||||
|
||||
socks = dict(self.sync_poller.poll(self.heartbeat_timeout))
|
||||
if self.sync_socket in socks and socks[self.sync_socket] == self.zmq.POLLIN:
|
||||
message = self.sync_socket.recv()
|
||||
#else:
|
||||
#raise Exception("Sync ack timed out on response for {id}".format(id=self.get_id))
|
||||
|
||||
def bind_data(self):
|
||||
return self.bind_pull_socket(self.addresses['data_address'])
|
||||
|
||||
|
||||
Reference in New Issue
Block a user