diff --git a/catalyst/marketplace/marketplace.py b/catalyst/marketplace/marketplace.py index 225b2459..22344c1b 100644 --- a/catalyst/marketplace/marketplace.py +++ b/catalyst/marketplace/marketplace.py @@ -33,6 +33,7 @@ from catalyst.marketplace.utils.eth_utils import bin_hex, from_grains, \ from catalyst.marketplace.utils.path_utils import get_bundle_folder, \ get_data_source_folder, get_marketplace_folder, \ get_user_pubaddr, get_temp_bundles_folder, extract_bundle +from catalyst.utils.paths import ensure_directory if sys.version_info.major < 3: import urllib @@ -154,14 +155,14 @@ class Marketplace: 'Gas Price:\t\t[Accept the default value]\n' 'Nonce:\t\t\t{nonce}\n' 'Data:\t\t\t{data}\n'.format( - url=url, - _from=tx['from'], - to=tx['to'], - value=tx['value'], - gas=tx['gas'], - nonce=tx['nonce'], - data=tx['data'], ) - ) + url=url, + _from=tx['from'], + to=tx['to'], + value=tx['value'], + gas=tx['gas'], + nonce=tx['nonce'], + data=tx['data'], ) + ) webbrowser.open_new(url) @@ -221,16 +222,16 @@ class Marketplace: print(df_sets) dataset_num = input('Choose the dataset you want to ' 'subscribe to [0..{}]: '.format( - df_sets.size-1)) + df_sets.size - 1)) try: dataset_num = int(dataset_num) except ValueError: print('Enter a number between 0 and {}'.format( - df_sets.size-1)) + df_sets.size - 1)) else: if dataset_num not in range(0, df_sets.size): print('Enter a number between 0 and {}'.format( - df_sets.size-1)) + df_sets.size - 1)) else: dataset = df_sets.iloc[dataset_num]['dataset'] break @@ -296,14 +297,14 @@ class Marketplace: 'buy: {} ENG. Get enough ENG to cover the costs of the ' 'monthly\nsubscription for what you are trying to buy, ' 'and try again.'.format( - address, from_grains(balance), price)) + address, from_grains(balance), price)) return while True: agree_pay = input('Please confirm that you agree to pay {} ENG ' 'for a monthly subscription to the dataset "{}" ' 'starting today. [default: Y] '.format( - price, dataset)) or 'y' + price, dataset)) or 'y' if agree_pay.lower() not in ('y', 'n'): print("Please answer Y or N.") else: @@ -365,10 +366,10 @@ class Marketplace: 'Now processing second transaction.') tx = self.mkt_contract.functions.subscribe( - Web3.toHex(dataset), - ).buildTransaction({ - 'from': address, - 'nonce': self.web3.eth.getTransactionCount(address)}) + Web3.toHex(dataset), + ).buildTransaction({ + 'from': address, + 'nonce': self.web3.eth.getTransactionCount(address)}) if 'ropsten' in ETH_REMOTE_NODE: tx['gas'] = min(int(tx['gas'] * 1.5), 4700000) @@ -408,7 +409,7 @@ class Marketplace: 'You can now ingest this dataset anytime during the ' 'next month by running the following command:\n' 'catalyst marketplace ingest --dataset={}'.format( - dataset, address, dataset)) + dataset, address, dataset)) def process_temp_bundle(self, ds_name, path): """ @@ -425,7 +426,10 @@ class Marketplace: """ tmp_bundle = extract_bundle(path) - bundle_folder = get_data_source_folder(ds_name) + bundle_folder = os.path.join( + get_data_source_folder(ds_name), 'bundle' + ) + ensure_directory(bundle_folder) if os.listdir(bundle_folder): zsource = bcolz.ctable(rootdir=tmp_bundle, mode='r') ztarget = bcolz.ctable(rootdir=bundle_folder, mode='r') @@ -450,16 +454,16 @@ class Marketplace: print(df_sets) dataset_num = input('Choose the dataset you want to ' 'ingest [0..{}]: '.format( - df_sets.size-1)) + df_sets.size - 1)) try: dataset_num = int(dataset_num) except ValueError: print('Enter a number between 0 and {}'.format( - df_sets.size-1)) + df_sets.size - 1)) else: if dataset_num not in range(0, df_sets.size): print('Enter a number between 0 and {}'.format( - df_sets.size-1)) + df_sets.size - 1)) else: ds_name = df_sets.iloc[dataset_num]['dataset'] break @@ -491,10 +495,10 @@ class Marketplace: print('Your subscription to dataset "{}" expired on {} UTC.' 'Please renew your subscription by running:\n' 'catalyst marketplace subscribe --dataset={}'.format( - ds_name, - pd.to_datetime(check_sub[4], unit='s', utc=True), - ds_name) - ) + ds_name, + pd.to_datetime(check_sub[4], unit='s', utc=True), + ds_name) + ) if 'key' in self.addresses[address_i]: key = self.addresses[address_i]['key'] diff --git a/catalyst/marketplace/utils/bundle_utils.py b/catalyst/marketplace/utils/bundle_utils.py index 086e6b2c..3fc9a45a 100644 --- a/catalyst/marketplace/utils/bundle_utils.py +++ b/catalyst/marketplace/utils/bundle_utils.py @@ -1,8 +1,12 @@ import os +import random +import re import shutil import bcolz +import numpy as np import pandas as pd +from six import string_types def merge_bundles(zsource, ztarget): @@ -27,6 +31,8 @@ def merge_bundles(zsource, ztarget): df.drop_duplicates(inplace=True) df.set_index(['date', 'symbol'], drop=False, inplace=True) + sanitize_df(df) + dirname = os.path.basename(ztarget.rootdir) bak_dir = ztarget.rootdir.replace(dirname, '.{}'.format(dirname)) shutil.move(ztarget.rootdir, bak_dir) @@ -34,3 +40,51 @@ def merge_bundles(zsource, ztarget): z = bcolz.ctable.fromdataframe(df=df, rootdir=ztarget.rootdir) shutil.rmtree(bak_dir) return z + + +def sanitize_df(df): + # Using a sampling method to identify dates for efficiency with + # large datasets + if len(df) > 100: + indexes = random.sample(range(0, len(df) - 1), 100) + else: + indexes = range(0, len(df) - 1) + + for column in df.columns: + is_date = False + for index in indexes: + value = df[column].iloc[index] + if not isinstance(value, string_types): + continue + + # TODO: assuming that the date is at least daily + exp = re.compile(r'^\d{4}-\d{2}-\d{2}.*$') + matches = exp.findall(value) + + if matches: + is_date = True + break + + if is_date: + df[column] = pd.to_datetime(df[column]) + + else: + try: + ser = safely_reduce_dtype(df[column]) + df[column] = ser + except Exception: + pass + + return df + + +def safely_reduce_dtype(ser): # pandas.Series or numpy.array + orig_dtype = "".join( + [x for x in ser.dtype.name if x.isalpha()]) # float/int + mx = 1 + for val in ser.values: + new_itemsize = np.min_scalar_type(val).itemsize + if mx < new_itemsize: + mx = new_itemsize + new_dtype = orig_dtype + str(mx * 8) + return ser.astype(new_dtype)