diff --git a/zipline/messaging.py b/zipline/messaging.py index 65ef93b0..fbeadd90 100644 --- a/zipline/messaging.py +++ b/zipline/messaging.py @@ -280,17 +280,22 @@ class ParallelBuffer(Component): if not(self.is_full() or self.draining): return - cur = None - earliest = None + cur_source = None + earliest_source = None + earliest_event = None + #iterate over the queues of events from all sources (1 queue per datasource) for events in self.data_buffer.values(): if len(events) == 0: continue - cur = events - if (earliest == None) or (cur[0].dt <= earliest[0].dt): - earliest = cur + cur_source = events + first_in_list = events[0] + + if (earliest_event == None) or (first_in_list.dt <= earliest_event.dt): + earliest_event = first_in_list + earliest_source = cur_source - if earliest != None: - return earliest.pop(0) + if earliest_event != None: + return earliest_source.pop(0) def is_full(self): """ @@ -380,7 +385,7 @@ class MergedParallelBuffer(ParallelBuffer): source_id. """ - self.data_buffer[event.__dict__.keys()[0]].append(event) + self.data_buffer[event.keys()[0]].append(event) self.received_count += 1 @@ -549,8 +554,8 @@ class DataSource(Component): """ assert isinstance(event, zp.namedict) - event.__dict__['source_id'] = self.get_id - event.__dict__['type'] = self.get_type + event['source_id'] = self.get_id + event['type'] = self.get_type try: ds_frame = self.frame(event) diff --git a/zipline/protocol.py b/zipline/protocol.py index e7ffed1a..090a28cd 100644 --- a/zipline/protocol.py +++ b/zipline/protocol.py @@ -98,6 +98,19 @@ class namedict(object): self.__dict__['id'] = value else: self.__dict__[key] = value + + def __getitem__(self, key): + return self.__dict__[key] + + def keys(self): + return self.__dict__.keys() + + def as_dict(self): + #TODO: make a copy? + return self.__dict__ + + def delete(self, key): + del(self.__dict__[key]) def merge(self, other_nd): assert isinstance(other_nd, namedict) @@ -248,7 +261,7 @@ def FEED_FRAME(event): source_id = event.source_id ds_type = event.type PACK_DATE(event) - payload = event.__dict__ + payload = event.as_dict() return msgpack.dumps(payload) def FEED_UNFRAME(msg): @@ -317,7 +330,7 @@ def MERGE_FRAME(event): event.TRANSACTION = TRANSFORM_TYPE.EMPTY else: event.TRANSACTION = TRANSACTION_FRAME(event.TRANSACTION) - payload = event.__dict__ + payload = event.as_dict() return msgpack.dumps(payload) def MERGE_UNFRAME(msg): @@ -474,7 +487,7 @@ def PACK_DATE(event): epoch = long(event.dt.strftime('%s')) event['epoch'] = epoch event['micros'] = event.dt.microsecond - del(event.__dict__['dt']) + event.delete('dt') return event def UNPACK_DATE(payload): @@ -482,9 +495,9 @@ def UNPACK_DATE(payload): assert isinstance(payload.micros, numbers.Integral) dt = datetime.datetime.fromtimestamp(payload.epoch) dt = dt.replace(microsecond = payload.micros, tzinfo = pytz.utc) - del(payload.__dict__['epoch']) - del(payload.__dict__['micros']) - payload['dt'] = dt + payload.delete('epoch') + payload.delete('micros') + payload.dt = dt return payload DATASOURCE_TYPE = Enum( diff --git a/zipline/test/test_finance.py b/zipline/test/test_finance.py index d1f35271..60f88ad3 100644 --- a/zipline/test/test_finance.py +++ b/zipline/test/test_finance.py @@ -53,7 +53,7 @@ class FinanceTestCase(TestCase): #check the transformed value, should only be in event, not trade. self.assertTrue(event.helloworld == 2345.6) - del(event.__dict__['helloworld']) + event.delete('helloworld') self.assertEqual(zp.namedict(trade), event)