multiple sources, each as component, feeding sort.

This commit is contained in:
fawce
2012-08-02 22:51:17 -04:00
parent 193dc20a7a
commit f166626ea8
5 changed files with 113 additions and 110 deletions
+34 -28
View File
@@ -64,18 +64,19 @@ class ComponentTestCase(TestCase):
'count' : count
}
trade_gen = SpecificEquityTrades(*args_a, **kwargs_a)
monitor.add_to_topology(trade_gen.get_hash())
launch_monitor(monitor)
comp_a = Component(
SpecificEquityTrades(*args_a, **kwargs_a),
trade_gen,
monitor,
socket_uri,
DATASOURCE_FRAME,
DATASOURCE_UNFRAME
)
launch_monitor(monitor)
iter_a = iter(comp_a)
ev = iter_a.next()
return
for event in comp_a:
log.info(event)
@@ -96,16 +97,8 @@ class ComponentTestCase(TestCase):
'filter' : filter,
'count' : count
}
comp_a = Component(
SpecificEquityTrades(*args_a, **kwargs_a),
monitor,
socket_uris[0],
DATASOURCE_FRAME,
DATASOURCE_UNFRAME
)
trade_gen_a = SpecificEquityTrades(*args_a, **kwargs_a)
monitor.add_to_topology(trade_gen_a.get_hash())
#Set up source b. Two minutes between events.
args_b = tuple()
@@ -116,15 +109,8 @@ class ComponentTestCase(TestCase):
'filter' : filter,
'count' : count
}
comp_b = Component(
SpecificEquityTrades(*args_b, **kwargs_b),
monitor,
socket_uris[1],
DATASOURCE_FRAME,
DATASOURCE_UNFRAME
)
trade_gen_b = SpecificEquityTrades(*args_b, **kwargs_b)
monitor.add_to_topology(trade_gen_b.get_hash())
#Set up source c. Three minutes between events.
args_c = tuple()
@@ -136,18 +122,38 @@ class ComponentTestCase(TestCase):
'count' : count
}
trade_gen_c = SpecificEquityTrades(*args_c, **kwargs_c)
monitor.add_to_topology(trade_gen_c.get_hash())
launch_monitor(monitor)
comp_a = Component(
trade_gen_a,
monitor,
socket_uris[0],
DATASOURCE_FRAME,
DATASOURCE_UNFRAME
)
comp_b = Component(
trade_gen_b,
monitor,
socket_uris[1],
DATASOURCE_FRAME,
DATASOURCE_UNFRAME
)
comp_c = Component(
SpecificEquityTrades(*args_c, **kwargs_c),
trade_gen_c,
monitor,
socket_uris[2],
DATASOURCE_FRAME,
DATASOURCE_UNFRAME
)
launch_monitor(monitor)
sources = [comp_a, comp_b, comp_c]
gens = [iter(source) for source in sources]
sorted_out = date_sorted_sources(gens)
sorted_out = date_sorted_sources(*sources)
prev = None
sort_count = 0
+76 -78
View File
@@ -83,10 +83,6 @@ class Component(object):
self.unframe = unframe
self.prefix = ""
# register two components with the monitor
monitor.add_to_topology(self.component_id)
monitor.add_to_topology("FORK-"+self.component_id)
# TODO: state_flag is deprecated, remove
self.state_flag = COMPONENT_STATE.OK
@@ -98,10 +94,19 @@ class Component(object):
self.guid = uuid.uuid4()
self.huid = humanhash.humanize(self.guid.hex)
# first, start the generator in its own process. Once
# Monitor says "go", Events from the generator will be
# FRAME'd and PUSH'd to self.socket_uri.
proc = multiprocessing.Process(
target=self.loop_send
)
proc.start()
# ------------
# Generator
# Message Receiver/Generator
# ------------
self.gen = None
self.recv_gen = self.create_recv_gen()
# ------------
@@ -109,94 +114,87 @@ class Component(object):
# ------------
def _run_out(self):
def loop_send(self):
"""
The main component loop. This is wrapped inside a
exception reporting context inside of run.
The core logic of the all components is run here.
"""
# The process title so you can watch it in top, ps.
setproctitle(self.generator.__class__.__name__)
self.prefix = "FORK-"
log.info("Start %r" % self)
log.info("Pid %s" % os.getpid())
log.info("Group %s" % os.getpgrp())
self.open()
self.signal_ready()
self.lock_ready()
self.wait_ready()
# -----------------------
# YOU SHALL NOT PASS!!!!!
# -----------------------
# ... until the monitor signals GO
for event in self.generator:
self.heartbeat()
event.source_id = self.get_id
msg = self.frame(event)
self.out_socket.send(msg)
self.signal_done()
def _run_in(self):
self.open(send=False)
self.signal_ready()
self.lock_ready()
self.wait_ready()
# -----------------------
# YOU SHALL NOT PASS!!!!!
# -----------------------
# ... until the monitor signals GO
# return the generator
for event in gen_from_poller(self.poll, self.in_socket, self.unframe):
event.source_id = self.get_id
yield event
self.signal_done()
def run_safe(self, func):
"""
Run a function that is assumed to include wait_ready and
heartbeat. Used to wrap fork_generator and consume_gen.
"""
try:
return func()
# The process title so you can watch it in top, ps.
setproctitle(self.generator.__class__.__name__)
self.prefix = "FORK-"
log.info("Start %r" % self)
log.info("Pid %s" % os.getpid())
log.info("Group %s" % os.getpgrp())
self.open()
self.signal_ready()
self.lock_ready()
self.wait_ready()
# -----------------------
# YOU SHALL NOT PASS!!!!!
# -----------------------
# ... until the monitor signals GO
for event in self.generator:
self.heartbeat()
msg = self.frame(event)
self.out_socket.send(msg)
self.signal_done()
except Exception as exc:
if not isinstance(exc, KillSignal):
self.signal_exception(exc)
else:
# if we get a kill signal, forcibly close all the
# sockets.
self.teardown_sockets()
self.handle_exception(exc)
finally:
log.info("Exiting %r" % self)
def _launch(self):
# first, start the generator in its own process. Once
# Monitor says "go", Events from the generator will be
# FRAME'd and PUSH'd to self.socket_uri.
proc = multiprocessing.Process(
target=self.run_safe,
args=(self._run_out,)
)
proc.start()
def create_recv_gen(self):
try:
self.open(send=False)
self.signal_ready()
self.lock_ready()
# return the generator
return self.loop_recv()
except Exception as exc:
self.handle_exception(exc)
finally:
log.info("Created Recv Gen for %r" % self)
# Start the poller-generator, which will PULL messages
# from self.sockiet_uri, UNFRAME'd them, and yield them.
return self.run_safe(self._run_in)
def loop_recv(self):
try:
# we block on ready here until monitor sends the GO
self.wait_ready()
log.info("Starting to drain {id}".format(id=self.get_id))
for event in gen_from_poller(self.poll, self.in_socket, self.unframe):
self.heartbeat()
# event.source_id = self.get_id
yield event
self.signal_done()
except Exception as exc:
self.handle_exception(exc)
finally:
log.info("Exiting %r" % self)
def handle_exception(self, exc, re_raise=False):
if not isinstance(exc, KillSignal):
self.signal_exception(exc)
else:
# if we get a kill signal, forcibly close all the
# sockets.
self.teardown_sockets()
def __iter__(self):
if not self.gen:
self.gen = self._launch()
return self
return self.gen
def next(self):
return self.recv_gen.next()
# ----------------------------
# Cleanup & Modes of Failure
+1 -1
View File
@@ -120,7 +120,7 @@ class Monitor(object):
return
def add_to_topology(self, component_id):
add = set([component_id])
add = set([component_id, "FORK-" + component_id])
self.topology.update(add)
def freeze_topology(self):
+1 -2
View File
@@ -43,8 +43,7 @@ def roundrobin(sources, namestrings):
"""
assert len(sources) == len(namestrings)
mapping = OrderedDict(zip(namestrings, sources))
import nose.tools; nose.tools.set_trace()
# While our generators have not been exhausted, pull elements
while mapping.keys() != []:
for namestring, source in mapping.iteritems():
+1 -1
View File
@@ -21,7 +21,7 @@ def gen_from_pull_socket(socket_uri, context, unframe):
def gen_from_poller(poller, in_socket, unframe):
while True:
socks = dict(poller.poll(1000))
socks = dict(poller.poll())
if socks.get(in_socket) == zmq.POLLIN:
message = in_socket.recv()