updated namedict based on sdiehl's feedback, modified ParallelBuffer.get_next() method to be more readable.

This commit is contained in:
fawce
2012-03-03 11:45:04 -05:00
parent 12be673011
commit 5c0bb6acae
3 changed files with 35 additions and 17 deletions
+15 -10
View File
@@ -280,17 +280,22 @@ class ParallelBuffer(Component):
if not(self.is_full() or self.draining):
return
cur = None
earliest = None
cur_source = None
earliest_source = None
earliest_event = None
#iterate over the queues of events from all sources (1 queue per datasource)
for events in self.data_buffer.values():
if len(events) == 0:
continue
cur = events
if (earliest == None) or (cur[0].dt <= earliest[0].dt):
earliest = cur
cur_source = events
first_in_list = events[0]
if (earliest_event == None) or (first_in_list.dt <= earliest_event.dt):
earliest_event = first_in_list
earliest_source = cur_source
if earliest != None:
return earliest.pop(0)
if earliest_event != None:
return earliest_source.pop(0)
def is_full(self):
"""
@@ -380,7 +385,7 @@ class MergedParallelBuffer(ParallelBuffer):
source_id.
"""
self.data_buffer[event.__dict__.keys()[0]].append(event)
self.data_buffer[event.keys()[0]].append(event)
self.received_count += 1
@@ -549,8 +554,8 @@ class DataSource(Component):
"""
assert isinstance(event, zp.namedict)
event.__dict__['source_id'] = self.get_id
event.__dict__['type'] = self.get_type
event['source_id'] = self.get_id
event['type'] = self.get_type
try:
ds_frame = self.frame(event)
+19 -6
View File
@@ -98,6 +98,19 @@ class namedict(object):
self.__dict__['id'] = value
else:
self.__dict__[key] = value
def __getitem__(self, key):
return self.__dict__[key]
def keys(self):
return self.__dict__.keys()
def as_dict(self):
#TODO: make a copy?
return self.__dict__
def delete(self, key):
del(self.__dict__[key])
def merge(self, other_nd):
assert isinstance(other_nd, namedict)
@@ -248,7 +261,7 @@ def FEED_FRAME(event):
source_id = event.source_id
ds_type = event.type
PACK_DATE(event)
payload = event.__dict__
payload = event.as_dict()
return msgpack.dumps(payload)
def FEED_UNFRAME(msg):
@@ -317,7 +330,7 @@ def MERGE_FRAME(event):
event.TRANSACTION = TRANSFORM_TYPE.EMPTY
else:
event.TRANSACTION = TRANSACTION_FRAME(event.TRANSACTION)
payload = event.__dict__
payload = event.as_dict()
return msgpack.dumps(payload)
def MERGE_UNFRAME(msg):
@@ -474,7 +487,7 @@ def PACK_DATE(event):
epoch = long(event.dt.strftime('%s'))
event['epoch'] = epoch
event['micros'] = event.dt.microsecond
del(event.__dict__['dt'])
event.delete('dt')
return event
def UNPACK_DATE(payload):
@@ -482,9 +495,9 @@ def UNPACK_DATE(payload):
assert isinstance(payload.micros, numbers.Integral)
dt = datetime.datetime.fromtimestamp(payload.epoch)
dt = dt.replace(microsecond = payload.micros, tzinfo = pytz.utc)
del(payload.__dict__['epoch'])
del(payload.__dict__['micros'])
payload['dt'] = dt
payload.delete('epoch')
payload.delete('micros')
payload.dt = dt
return payload
DATASOURCE_TYPE = Enum(
+1 -1
View File
@@ -53,7 +53,7 @@ class FinanceTestCase(TestCase):
#check the transformed value, should only be in event, not trade.
self.assertTrue(event.helloworld == 2345.6)
del(event.__dict__['helloworld'])
event.delete('helloworld')
self.assertEqual(zp.namedict(trade), event)