BLD: marketplace - ingestion downloads multiple files

This commit is contained in:
Victor Grau Serrat
2018-02-01 16:54:24 -07:00
parent 18123d7f7e
commit 56bd3db7a2
+27 -13
View File
@@ -1,4 +1,5 @@
import os
import re
import sys
import json
import hmac
@@ -11,12 +12,13 @@ import bcolz
import logbook
import pandas as pd
import six
from requests_toolbelt import MultipartDecoder
import requests
from web3 import Web3, HTTPProvider
from catalyst.constants import ( LOG_LEVEL, AUTH_SERVER, ETH_REMOTE_NODE,
MARKETPLACE_CONTRACT, MARKETPLACE_CONTRACT_ABI, ENIGMA_CONTRACT,
ENIGMA_CONTRACT_ABI )
from catalyst.constants import (
LOG_LEVEL, AUTH_SERVER, ETH_REMOTE_NODE, MARKETPLACE_CONTRACT,
MARKETPLACE_CONTRACT_ABI, ENIGMA_CONTRACT, ENIGMA_CONTRACT_ABI)
from catalyst.exchange.utils.stats_utils import set_print_settings
from catalyst.marketplace.marketplace_errors import (
MarketplacePubAddressEmpty, MarketplaceDatasetNotFound,
@@ -97,7 +99,7 @@ class Marketplace:
# ),
# dict(
# name='Influencers',
# desc='Tweets and related sentiments by selected influencers.',
# desc='Tweets & related sentiments by selected influencers.',
# start_date=pd.to_datetime('2017-01-01'),
# end_date=pd.to_datetime('2018-01-15'),
# data_frequencies=['daily', 'hour', 'minute'],
@@ -349,6 +351,8 @@ class Marketplace:
def ingest(self, dataset, data_frequency=None, start=None,
end=None, force_download=False):
dataset = dataset.lower()
address, address_i = self.choose_pubaddr()
check_sub = self.mkt_contract.functions.checkAddressSubscription(
@@ -369,7 +373,7 @@ class Marketplace:
secret = self.addresses[address_i]['secret']
else:
# TODO: Verify signature to obtain key/secret pair
key, secret = get_key_secret(datasource[0], dataset)
key, secret = get_key_secret(address, dataset)
nonce = str(int(time.time()))
@@ -382,15 +386,27 @@ class Marketplace:
'Nonce': nonce,
'Dataset': dataset}
r = requests.post('{}/ingest'.format(AUTH_SERVER), headers=headers)
r = requests.post('{}/marketplace/ingest'.format(AUTH_SERVER),
headers=headers, stream=True)
if r.status_code == 200:
decoder = MultipartDecoder.from_response(r)
for part in decoder.parts:
h = part.headers[b'Content-Disposition'].decode('utf-8')
filename = re.search(r'filename="(.*)"', h).group(1)
with open(filename, 'wb') as f:
# for chunk in part.content.iter_content(chunk_size=1024):
# if chunk: # filter out keep-alive new chunks
# f.write(chunk)
f.write(part.content)
else:
raise MarketplaceHTTPRequest(request='ingest dataset',
error=r.status_code)
print(r)
print('Download successful, now we need to ingest...')
exit(0)
dataset = dataset.lower()
# TODO: sync with Fred on the below and
# exiting for now
period = start.strftime('%Y-%m-%d')
tmp_folder = get_data_source(dataset, period, force_download)
@@ -404,8 +420,6 @@ class Marketplace:
else:
os.rename(tmp_folder, bundle_folder)
pass
def get_data_source(self, data_source_name, data_frequency=None,
start=None, end=None):
data_source_name = data_source_name.lower()