diff --git a/catalyst/marketplace/marketplace.py b/catalyst/marketplace/marketplace.py index 08249775..30860551 100644 --- a/catalyst/marketplace/marketplace.py +++ b/catalyst/marketplace/marketplace.py @@ -1,4 +1,5 @@ import os +import re import sys import json import hmac @@ -11,12 +12,13 @@ import bcolz import logbook import pandas as pd import six +from requests_toolbelt import MultipartDecoder import requests from web3 import Web3, HTTPProvider -from catalyst.constants import ( LOG_LEVEL, AUTH_SERVER, ETH_REMOTE_NODE, - MARKETPLACE_CONTRACT, MARKETPLACE_CONTRACT_ABI, ENIGMA_CONTRACT, - ENIGMA_CONTRACT_ABI ) +from catalyst.constants import ( + LOG_LEVEL, AUTH_SERVER, ETH_REMOTE_NODE, MARKETPLACE_CONTRACT, + MARKETPLACE_CONTRACT_ABI, ENIGMA_CONTRACT, ENIGMA_CONTRACT_ABI) from catalyst.exchange.utils.stats_utils import set_print_settings from catalyst.marketplace.marketplace_errors import ( MarketplacePubAddressEmpty, MarketplaceDatasetNotFound, @@ -97,7 +99,7 @@ class Marketplace: # ), # dict( # name='Influencers', - # desc='Tweets and related sentiments by selected influencers.', + # desc='Tweets & related sentiments by selected influencers.', # start_date=pd.to_datetime('2017-01-01'), # end_date=pd.to_datetime('2018-01-15'), # data_frequencies=['daily', 'hour', 'minute'], @@ -349,6 +351,8 @@ class Marketplace: def ingest(self, dataset, data_frequency=None, start=None, end=None, force_download=False): + dataset = dataset.lower() + address, address_i = self.choose_pubaddr() check_sub = self.mkt_contract.functions.checkAddressSubscription( @@ -369,7 +373,7 @@ class Marketplace: secret = self.addresses[address_i]['secret'] else: # TODO: Verify signature to obtain key/secret pair - key, secret = get_key_secret(datasource[0], dataset) + key, secret = get_key_secret(address, dataset) nonce = str(int(time.time())) @@ -382,15 +386,27 @@ class Marketplace: 'Nonce': nonce, 'Dataset': dataset} - r = requests.post('{}/ingest'.format(AUTH_SERVER), headers=headers) + r = requests.post('{}/marketplace/ingest'.format(AUTH_SERVER), + headers=headers, stream=True) + if r.status_code == 200: + decoder = MultipartDecoder.from_response(r) + for part in decoder.parts: + h = part.headers[b'Content-Disposition'].decode('utf-8') + filename = re.search(r'filename="(.*)"', h).group(1) + with open(filename, 'wb') as f: + # for chunk in part.content.iter_content(chunk_size=1024): + # if chunk: # filter out keep-alive new chunks + # f.write(chunk) + f.write(part.content) + else: + raise MarketplaceHTTPRequest(request='ingest dataset', + error=r.status_code) - print(r) + print('Download successful, now we need to ingest...') exit(0) - - - - dataset = dataset.lower() + # TODO: sync with Fred on the below and + # exiting for now period = start.strftime('%Y-%m-%d') tmp_folder = get_data_source(dataset, period, force_download) @@ -404,8 +420,6 @@ class Marketplace: else: os.rename(tmp_folder, bundle_folder) - pass - def get_data_source(self, data_source_name, data_frequency=None, start=None, end=None): data_source_name = data_source_name.lower()