diff --git a/catalyst/__main__.py b/catalyst/__main__.py index 1c6d0870..6f3f97ab 100644 --- a/catalyst/__main__.py +++ b/catalyst/__main__.py @@ -1,11 +1,13 @@ import errno import os +import sys from functools import wraps import click import sys import logbook import pandas as pd +from catalyst.marketplace.marketplace import Marketplace from six import text_type from catalyst.data import bundles as bundles_module @@ -761,5 +763,130 @@ def bundles(): click.echo("%s %s" % (bundle, timestamp), sys.stdout) +@main.group() +@click.pass_context +def marketplace(ctx): + pass + + +@marketplace.command() +@click.pass_context +def ls(ctx): + click.echo('Listing of available data sources on the marketplace:', + sys.stdout) + marketplace = Marketplace() + marketplace.list() + + +@marketplace.command() +@click.option( + '--dataset', + default=None, + help='The name of the dataset to ingest from the Data Marketplace.', +) +@click.pass_context +def subscribe(ctx, dataset): + if dataset is None: + ctx.fail("must specify a dataset to subscribe to with '--dataset'\n" + "List available dataset on the marketplace with " + "'catalyst marketplace ls'") + marketplace = Marketplace() + marketplace.subscribe(dataset) + + +@marketplace.command() +@click.option( + '--dataset', + default=None, + help='The name of the dataset to ingest from the Data Marketplace.', +) +@click.option( + '-f', + '--data-frequency', + type=click.Choice({'daily', 'minute', 'daily,minute', 'minute,daily'}), + default='daily', + show_default=True, + help='The data frequency of the desired OHLCV bars.', +) +@click.option( + '-s', + '--start', + default=None, + type=Date(tz='utc', as_timestamp=True), + help='The start date of the data range. (default: one year from end date)', +) +@click.option( + '-e', + '--end', + default=None, + type=Date(tz='utc', as_timestamp=True), + help='The end date of the data range. (default: today)', +) +@click.pass_context +def ingest(ctx, dataset, data_frequency, start, end): + if dataset is None: + ctx.fail("must specify a dataset to clean with '--dataset'\n" + "List available dataset on the marketplace with " + "'catalyst marketplace ls'") + click.echo('Ingesting data: {}'.format(dataset), sys.stdout) + marketplace = Marketplace() + marketplace.ingest(dataset, data_frequency, start, end) + + +@marketplace.command() +@click.option( + '--dataset', + default=None, + help='The name of the dataset to ingest from the Data Marketplace.', +) +@click.pass_context +def clean(ctx, dataset): + if dataset is None: + ctx.fail("must specify a dataset to ingest with '--dataset'\n" + "List available dataset on the marketplace with " + "'catalyst marketplace ls'") + click.echo('Cleaning data source: {}'.format(dataset), sys.stdout) + marketplace = Marketplace() + marketplace.clean(dataset) + click.echo('Done', sys.stdout) + + +@marketplace.command() +@click.pass_context +def register(ctx): + marketplace = Marketplace() + marketplace.register() + + +@marketplace.command() +@click.option( + '--dataset', + default=None, + help='The name of the Marketplace dataset to publish data for.', +) +@click.option( + '--datadir', + default=None, + help='The folder that contains the CSV data files to publish.', +) +@click.option( + '--watch/--no-watch', + is_flag=True, + default=False, + help='Whether to watch the datadir for live data.', +) +@click.pass_context +def publish(ctx, dataset, datadir, watch): + marketplace = Marketplace() + if dataset is None: + ctx.fail("must specify a dataset to publish data for " + " with '--dataset'\n") + if datadir is None: + ctx.fail("must specify a datadir where to find the files to publish " + " with '--datadir'\n") + marketplace.publish(dataset, datadir, watch) + click.echo("%s %s" % (bundle, timestamp), sys.stdout) + + if __name__ == '__main__': main() diff --git a/catalyst/api.pyi b/catalyst/api.pyi index bf104664..6e46aba9 100644 --- a/catalyst/api.pyi +++ b/catalyst/api.pyi @@ -34,6 +34,7 @@ def attach_pipeline(pipeline, name, chunks=None): :func:`catalyst.api.pipeline_output` """ + def batch_market_order(share_counts): """Place a batch market order for multiple assets. @@ -48,6 +49,7 @@ def batch_market_order(share_counts): Index of ids for newly-created orders. """ + def cancel_order(order_param): """Cancel an open order. @@ -57,7 +59,9 @@ def cancel_order(order_param): The order_id or order object to cancel. """ -def continuous_future(root_symbol_str, offset=0, roll='volume', adjustment='mul'): + +def continuous_future(root_symbol_str, offset=0, roll='volume', + adjustment='mul'): """Create a specifier for a continuous contract. Parameters @@ -81,7 +85,10 @@ def continuous_future(root_symbol_str, offset=0, roll='volume', adjustment='mul' The continuous future specifier. """ -def fetch_csv(url, pre_func=None, post_func=None, date_column='date', date_format=None, timezone='UTC', symbol=None, mask=True, symbol_column=None, special_params_checker=None, **kwargs): + +def fetch_csv(url, pre_func=None, post_func=None, date_column='date', + date_format=None, timezone='UTC', symbol=None, mask=True, + symbol_column=None, special_params_checker=None, **kwargs): """Fetch a csv from a remote url and register the data so that it is queryable from the ``data`` object. @@ -125,6 +132,7 @@ def fetch_csv(url, pre_func=None, post_func=None, date_column='date', date_forma A requests source that will pull data from the url specified. """ + def future_symbol(symbol): """Lookup a futures contract with a given symbol. @@ -144,6 +152,7 @@ def future_symbol(symbol): Raised when no contract named 'symbol' is found. """ + def get_datetime(tz=None): """ Returns the current simulation datetime. @@ -159,6 +168,7 @@ dt : datetime The current simulation datetime converted to ``tz``. """ + def get_environment(field='platform'): """Query the execution environment. @@ -198,6 +208,7 @@ def get_environment(field='platform'): Raised when ``field`` is not a valid option. """ + def get_order(order_id): """Lookup an order based on the order id returned from one of the order functions. @@ -213,10 +224,12 @@ def get_order(order_id): The order object. """ + def history(bar_count, frequency, field, ffill=True): """DEPRECATED: use ``data.history`` instead. """ + def order(asset, amount, limit_price=None, stop_price=None, style=None): """Place an order. @@ -258,7 +271,9 @@ def order(asset, amount, limit_price=None, stop_price=None, style=None): :func:`catalyst.api.order_percent` """ -def order_percent(asset, percent, limit_price=None, stop_price=None, style=None): + +def order_percent(asset, percent, limit_price=None, stop_price=None, + style=None): """Place an order in the specified asset corresponding to the given percent of the current portfolio value. @@ -293,6 +308,7 @@ def order_percent(asset, percent, limit_price=None, stop_price=None, style=None) :func:`catalyst.api.order_value` """ + def order_target(asset, target, limit_price=None, stop_price=None, style=None): """Place an order to adjust a position to a target number of shares. If the position doesn't already exist, this is equivalent to placing a new @@ -344,7 +360,9 @@ def order_target(asset, target, limit_price=None, stop_price=None, style=None): :func:`catalyst.api.order_target_value` """ -def order_target_percent(asset, target, limit_price=None, stop_price=None, style=None): + +def order_target_percent(asset, target, limit_price=None, stop_price=None, + style=None): """Place an order to adjust a position to a target percent of the current portfolio value. If the position doesn't already exist, this is equivalent to placing a new order. If the position does exist, this is @@ -396,7 +414,9 @@ def order_target_percent(asset, target, limit_price=None, stop_price=None, style :func:`catalyst.api.order_target_value` """ -def order_target_value(asset, target, limit_price=None, stop_price=None, style=None): + +def order_target_value(asset, target, limit_price=None, stop_price=None, + style=None): """Place an order to adjust a position to a target value. If the position doesn't already exist, this is equivalent to placing a new order. If the position does exist, this is equivalent to placing an @@ -448,6 +468,7 @@ def order_target_value(asset, target, limit_price=None, stop_price=None, style=N :func:`catalyst.api.order_target_percent` """ + def order_value(asset, value, limit_price=None, stop_price=None, style=None): """Place an order by desired value rather than desired number of shares. @@ -488,6 +509,7 @@ def order_value(asset, value, limit_price=None, stop_price=None, style=None): :func:`catalyst.api.order_percent` """ + def pipeline_output(name): """Get the results of the pipeline that was attached with the name: ``name``. @@ -514,6 +536,7 @@ def pipeline_output(name): :meth:`catalyst.pipeline.engine.PipelineEngine.run_pipeline` """ + def record(*args, **kwargs): """Track and record values each day. @@ -529,7 +552,9 @@ def record(*args, **kwargs): :func:`~catalyst.run_algorithm`. """ -def schedule_function(func, date_rule=None, time_rule=None, half_days=True, calendar=None): + +def schedule_function(func, date_rule=None, time_rule=None, half_days=True, + calendar=None): """Schedules a function to be called according to some timed rules. Parameters @@ -549,6 +574,7 @@ def schedule_function(func, date_rule=None, time_rule=None, half_days=True, cale :class:`catalyst.api.time_rules` """ + def set_asset_restrictions(restrictions, on_error='fail'): """Set a restriction on which assets can be ordered. @@ -562,6 +588,7 @@ def set_asset_restrictions(restrictions, on_error='fail'): catalyst.finance.asset_restrictions.Restrictions """ + def set_benchmark(benchmark): """Set the benchmark asset. @@ -576,6 +603,7 @@ def set_benchmark(benchmark): automatically reinvested. """ + def set_cancel_policy(cancel_policy): """Sets the order cancellation policy for the simulation. @@ -590,6 +618,7 @@ def set_cancel_policy(cancel_policy): :class:`catalyst.api.NeverCancel` """ + def set_commission(commission): """Sets the commission model for the simulation. @@ -605,6 +634,7 @@ def set_commission(commission): :class:`catalyst.finance.commission.PerDollar` """ + def set_do_not_order_list(restricted_list, on_error='fail'): """Set a restriction on which assets can be ordered. @@ -614,11 +644,13 @@ def set_do_not_order_list(restricted_list, on_error='fail'): The assets that cannot be ordered. """ + def set_long_only(on_error='fail'): """Set a rule specifying that this algorithm cannot take short positions. """ + def set_max_leverage(max_leverage): """Set a limit on the maximum leverage of the algorithm. @@ -629,6 +661,7 @@ def set_max_leverage(max_leverage): be no maximum. """ + def set_max_order_count(max_count, on_error='fail'): """Set a limit on the number of orders that can be placed in a single day. @@ -639,7 +672,9 @@ def set_max_order_count(max_count, on_error='fail'): The maximum number of orders that can be placed on any single day. """ -def set_max_order_size(asset=None, max_shares=None, max_notional=None, on_error='fail'): + +def set_max_order_size(asset=None, max_shares=None, max_notional=None, + on_error='fail'): """Set a limit on the number of shares and/or dollar value of any single order placed for sid. Limits are treated as absolute values and are enforced at the time that the algo attempts to place an order for sid. @@ -658,7 +693,9 @@ def set_max_order_size(asset=None, max_shares=None, max_notional=None, on_error= The maximum value that can be ordered at one time. """ -def set_max_position_size(asset=None, max_shares=None, max_notional=None, on_error='fail'): + +def set_max_position_size(asset=None, max_shares=None, max_notional=None, + on_error='fail'): """Set a limit on the number of shares and/or dollar value held for the given sid. Limits are treated as absolute values and are enforced at the time that the algo attempts to place an order for sid. This means @@ -681,6 +718,7 @@ def set_max_position_size(asset=None, max_shares=None, max_notional=None, on_err The maximum value to hold for an asset. """ + def set_slippage(slippage): """Set the slippage model for the simulation. @@ -694,6 +732,7 @@ def set_slippage(slippage): :class:`catalyst.finance.slippage.SlippageModel` """ + def set_symbol_lookup_date(dt): """Set the date for which symbols will be resolved to their assets (symbols may map to different firms or underlying assets at @@ -705,6 +744,7 @@ def set_symbol_lookup_date(dt): The new symbol lookup date. """ + def sid(sid): """Lookup an Asset by its unique asset identifier. @@ -724,6 +764,7 @@ def sid(sid): When a requested ``sid`` does not map to any asset. """ + def symbol(symbol_str): """Lookup an Equity by its ticker symbol. @@ -748,6 +789,7 @@ def symbol(symbol_str): :func:`catalyst.api.set_symbol_lookup_date` """ + def symbols(*args): """Lookup multuple Equities as a list. @@ -773,3 +815,21 @@ def symbols(*args): :func:`catalyst.api.set_symbol_lookup_date` """ + +def get_data_source(data_source_name, data_frequency=None, + start=None, end=None): + """ + Lookup a data source from the marketplace + + Parameters + ---------- + self + data_source_name + data_frequency + start + end + + Returns + ------- + + """ diff --git a/catalyst/constants.py b/catalyst/constants.py index c4111fdd..df0ea82b 100644 --- a/catalyst/constants.py +++ b/catalyst/constants.py @@ -15,4 +15,32 @@ SYMBOLS_URL = 'https://s3.amazonaws.com/enigmaco/catalyst-exchanges/' \ DATE_TIME_FORMAT = '%Y-%m-%d %H:%M' DATE_FORMAT = '%Y-%m-%d' +try: + ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) +except Exception as e: + print('unable to get catalyst path: {}'.format(e)) + AUTO_INGEST = False + +AUTH_SERVER = 'https://data.enigma.co' + +# TODO: switch to mainnet +ETH_REMOTE_NODE = 'https://ropsten.infura.io/' + +# TODO: move to MASTER branch on github +MARKETPLACE_CONTRACT = 'https://raw.githubusercontent.com/enigmampc/' \ + 'catalyst/data-marketplace/catalyst/marketplace/' \ + 'contract_marketplace_address.txt' + +MARKETPLACE_CONTRACT_ABI = 'https://raw.githubusercontent.com/enigmampc/' \ + 'catalyst/data-marketplace/catalyst/marketplace/' \ + 'contract_marketplace_abi.json' + +# TODO: switch to mainnet +ENIGMA_CONTRACT = 'https://raw.githubusercontent.com/enigmampc/catalyst/' \ + 'data-marketplace/catalyst/marketplace/' \ + 'contract_enigma_address.txt' + +ENIGMA_CONTRACT_ABI = 'https://raw.githubusercontent.com/enigmampc/' \ + 'catalyst/data-marketplace/catalyst/marketplace/' \ + 'contract_enigma_abi.json' \ No newline at end of file diff --git a/catalyst/examples/mean_reversion_by_marketcap.py b/catalyst/examples/mean_reversion_by_marketcap.py new file mode 100644 index 00000000..6d31dab7 --- /dev/null +++ b/catalyst/examples/mean_reversion_by_marketcap.py @@ -0,0 +1,233 @@ +# For this example, we're going to write a simple momentum script. When the +# stock goes up quickly, we're going to buy; when it goes down quickly, we're +# going to sell. Hopefully we'll ride the waves. +import os +import tempfile +import time + +import pandas as pd +import talib +from logbook import Logger + +from catalyst import run_algorithm +from catalyst.api import symbol, record, order_target_percent, get_data_source +from catalyst.exchange.utils.stats_utils import set_print_settings, \ + get_pretty_stats +# We give a name to the algorithm which Catalyst will use to persist its state. +# In this example, Catalyst will create the `.catalyst/data/live_algos` +# directory. If we stop and start the algorithm, Catalyst will resume its +# state using the files included in the folder. +from catalyst.utils.paths import ensure_directory + +NAMESPACE = 'mean_reversion_simple' +log = Logger(NAMESPACE) + + +# To run an algorithm in Catalyst, you need two functions: initialize and +# handle_data. + +def initialize(context): + # This initialize function sets any data or variables that you'll use in + # your algorithm. For instance, you'll want to define the trading pair (or + # trading pairs) you want to backtest. You'll also want to define any + # parameters or values you're going to use. + + # In our example, we're looking at Neo in Ether. + df = get_data_source( + 'marketcap', start=context.datetime + ) # type: pd.DataFrame + + # Keep only the top coins by market cap + df = df.loc[df['market_cap_usd'].isin(df['market_cap_usd'].nlargest(100))] + + set_print_settings() + + df.sort_values(by=['market_cap_usd'], ascending=True, inplace=True) + print('the marketplace data:\n{}'.format(df)) + + # Pick the 5 assets with the lowest market cap for trading + quote_currency = 'eth' + exchange = context.exchanges[next(iter(context.exchanges))] + symbols = [a.symbol for a in exchange.assets + if a.start_date < context.datetime] + context.assets = [] + for currency, price in df['market_cap_usd'].iteritems(): + if len(context.assets) >= 5: + break + + s = '{}_{}'.format(currency.decode('utf-8'), quote_currency) + if s in symbols: + context.assets.append(symbol(s)) + + context.base_price = None + context.current_day = None + + context.RSI_OVERSOLD = 55 + context.RSI_OVERBOUGHT = 60 + context.CANDLE_SIZE = '5T' + + context.start_time = time.time() + + +def handle_data(context, data): + # This handle_data function is where the real work is done. Our data is + # minute-level tick data, and each minute is called a frame. This function + # runs on each frame of the data. + + # We flag the first period of each day. + # Since cryptocurrencies trade 24/7 the `before_trading_starts` handle + # would only execute once. This method works with minute and daily + # frequencies. + today = data.current_dt.floor('1D') + if today != context.current_day: + context.traded_today = dict() + context.current_day = today + + # Preparing dictionaries for asset-level data points + volumes = dict() + rsis = dict() + price_values = dict() + cash = context.portfolio.cash + + for asset in context.assets: + # We're computing the volume-weighted-average-price of the security + # defined above, in the context.assets variable. For this example, + # we're using three bars on the 15 min bars. + + # The frequency attribute determine the bar size. We use this + # convention for the frequency alias: + # http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases + prices = data.history( + asset, + fields='close', + bar_count=50, + frequency=context.CANDLE_SIZE + ) + + # Ta-lib calculates various technical indicator based on price and + # volume arrays. + + # In this example, we are comp + rsi = talib.RSI(prices.values, timeperiod=14) + + # We need a variable for the current price of the security to compare + # to the average. Since we are requesting two fields, data.current() + # returns a DataFrame with + current = data.current(asset, fields=['close', 'volume']) + price = current['close'] + + # If base_price is not set, we use the current value. This is the + # price at the first bar which we reference to calculate price_change. + # if asset not in context.base_price: + # context.base_price[asset] = price + # + # base_price = context.base_price[asset] + # price_change = (price - base_price) / base_price + + # Tracking the relevant data + volumes[asset] = current['volume'] + rsis[asset] = rsi[-1] + price_values[asset] = price + # price_changes[asset] = price_change + + # We are trying to avoid over-trading by limiting our trades to + # one per day. + if asset in context.traded_today: + continue + + # Exit if we cannot trade + if not data.can_trade(asset): + continue + + # Another powerful built-in feature of the Catalyst backtester is the + # portfolio object. The portfolio object tracks your positions, cash, + # cost basis of specific holdings, and more. In this line, we + # calculate how long or short our position is at this minute. + pos_amount = context.portfolio.positions[asset].amount + + if rsi[-1] <= context.RSI_OVERSOLD and pos_amount == 0: + log.info( + '{}: buying - price: {}, rsi: {}'.format( + data.current_dt, price, rsi[-1] + ) + ) + # Set a style for limit orders, + limit_price = price * 1.005 + target = 1.0 / len(context.assets) + order_target_percent( + asset, target, limit_price=limit_price + ) + context.traded_today[asset] = True + + elif rsi[-1] >= context.RSI_OVERBOUGHT and pos_amount > 0: + log.info( + '{}: selling - price: {}, rsi: {}'.format( + data.current_dt, price, rsi[-1] + ) + ) + limit_price = price * 0.995 + order_target_percent( + asset, 0, limit_price=limit_price + ) + context.traded_today[asset] = True + + # Now that we've collected all current data for this frame, we use + # the record() method to save it. This data will be available as + # a parameter of the analyze() function for further analysis. + record( + current_price=price_values, + volume=volumes, + rsi=rsis, + cash=cash, + ) + + +def analyze(context=None, perf=None): + stats = get_pretty_stats(perf) + print('the algo stats:\n{}'.format(stats)) + pass + + +if __name__ == '__main__': + # The execution mode: backtest or live + live = False + + if live: + run_algorithm( + capital_base=0.1, + initialize=initialize, + handle_data=handle_data, + analyze=analyze, + exchange_name='poloniex', + live=True, + algo_namespace=NAMESPACE, + base_currency='btc', + live_graph=False, + simulate_orders=False, + stats_output=None, + ) + + else: + folder = os.path.join( + tempfile.gettempdir(), 'catalyst', NAMESPACE + ) + ensure_directory(folder) + + timestr = time.strftime('%Y%m%d-%H%M%S') + out = os.path.join(folder, '{}.p'.format(timestr)) + # catalyst run -f catalyst/examples/mean_reversion_simple.py \ + # -x bitfinex -s 2017-10-1 -e 2017-11-10 -c usdt -n mean-reversion \ + # --data-frequency minute --capital-base 10000 + run_algorithm( + capital_base=100, + data_frequency='minute', + initialize=initialize, + handle_data=handle_data, + analyze=analyze, + exchange_name='poloniex', + algo_namespace=NAMESPACE, + base_currency='eth', + start=pd.to_datetime('2017-10-01', utc=True), + end=pd.to_datetime('2017-10-15', utc=True), + ) + log.info('saved perf stats: {}'.format(out)) diff --git a/catalyst/exchange/exchange_algorithm.py b/catalyst/exchange/exchange_algorithm.py index 5c83a74a..901f4ac3 100644 --- a/catalyst/exchange/exchange_algorithm.py +++ b/catalyst/exchange/exchange_algorithm.py @@ -43,6 +43,7 @@ from catalyst.finance.execution import MarketOrder from catalyst.finance.performance import PerformanceTracker from catalyst.finance.performance.period import calc_period_stats from catalyst.gens.tradesimulation import AlgorithmSimulator +from catalyst.marketplace.marketplace import Marketplace from catalyst.utils.api_support import api_method from catalyst.utils.input_validation import error_keywords, ensure_upper_case from catalyst.utils.math_utils import round_nearest @@ -92,6 +93,8 @@ class ExchangeTradingAlgorithmBase(TradingAlgorithm): attempts=self.attempts, ) + self._marketplace = None + @staticmethod def __convert_order_params_for_blotter(limit_price, stop_price, style): """ @@ -167,6 +170,16 @@ class ExchangeTradingAlgorithmBase(TradingAlgorithm): """ return round_nearest(amount, asset.min_trade_size) + @api_method + def get_data_source(self, data_source_name, data_frequency=None, + start=None, end=None): + if self._marketplace is None: + self._marketplace = Marketplace() + + return self._marketplace.get_data_source( + data_source_name, data_frequency, start, end, + ) + @api_method @preprocess(symbol_str=ensure_upper_case) def symbol(self, symbol_str, exchange_name=None): diff --git a/catalyst/marketplace/__init__.py b/catalyst/marketplace/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalyst/marketplace/contract_enigma_abi.json b/catalyst/marketplace/contract_enigma_abi.json new file mode 100644 index 00000000..4dd70538 --- /dev/null +++ b/catalyst/marketplace/contract_enigma_abi.json @@ -0,0 +1,302 @@ +[ + { + "constant": true, + "inputs": [], + "name": "name", + "outputs": [ + { + "name": "", + "type": "string" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_spender", + "type": "address" + }, + { + "name": "_value", + "type": "uint256" + } + ], + "name": "approve", + "outputs": [ + { + "name": "", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "totalSupply", + "outputs": [ + { + "name": "", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_from", + "type": "address" + }, + { + "name": "_to", + "type": "address" + }, + { + "name": "_value", + "type": "uint256" + } + ], + "name": "transferFrom", + "outputs": [ + { + "name": "", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "INITIAL_SUPPLY", + "outputs": [ + { + "name": "", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "decimals", + "outputs": [ + { + "name": "", + "type": "uint8" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_spender", + "type": "address" + }, + { + "name": "_subtractedValue", + "type": "uint256" + } + ], + "name": "decreaseApproval", + "outputs": [ + { + "name": "success", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": false, + "inputs": [], + "name": "getAfterApproveTest", + "outputs": [ + { + "name": "", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": true, + "inputs": [ + { + "name": "_owner", + "type": "address" + } + ], + "name": "balanceOf", + "outputs": [ + { + "name": "balance", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "symbol", + "outputs": [ + { + "name": "", + "type": "string" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_to", + "type": "address" + }, + { + "name": "_value", + "type": "uint256" + } + ], + "name": "transfer", + "outputs": [ + { + "name": "", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_spender", + "type": "address" + }, + { + "name": "_addedValue", + "type": "uint256" + } + ], + "name": "increaseApproval", + "outputs": [ + { + "name": "success", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": true, + "inputs": [ + { + "name": "_owner", + "type": "address" + }, + { + "name": "_spender", + "type": "address" + } + ], + "name": "allowance", + "outputs": [ + { + "name": "", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "inputs": [ + { + "name": "testValue", + "type": "address" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "constructor" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "name": "owner", + "type": "address" + }, + { + "indexed": true, + "name": "spender", + "type": "address" + }, + { + "indexed": false, + "name": "value", + "type": "uint256" + } + ], + "name": "Approval", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "name": "from", + "type": "address" + }, + { + "indexed": true, + "name": "to", + "type": "address" + }, + { + "indexed": false, + "name": "value", + "type": "uint256" + } + ], + "name": "Transfer", + "type": "event" + } + ] \ No newline at end of file diff --git a/catalyst/marketplace/contract_enigma_address.txt b/catalyst/marketplace/contract_enigma_address.txt new file mode 100644 index 00000000..90fcd06a --- /dev/null +++ b/catalyst/marketplace/contract_enigma_address.txt @@ -0,0 +1 @@ +0x7fAec9aaE31BE428DeAAE1be8195dF609079Fd10 \ No newline at end of file diff --git a/catalyst/marketplace/contract_marketplace_abi.json b/catalyst/marketplace/contract_marketplace_abi.json new file mode 100644 index 00000000..4f0e2460 --- /dev/null +++ b/catalyst/marketplace/contract_marketplace_abi.json @@ -0,0 +1 @@ +[{"constant":true,"inputs":[{"name":"_dataSourceName","type":"bytes32"}],"name":"isActiveDataSource","outputs":[{"name":"isActive","type":"bool"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"mBegin","outputs":[{"name":"","type":"bytes32"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"FIXED_SUBSCRIPTION_PERIOD","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"_dataSourceName","type":"bytes32"},{"name":"_isPunished","type":"bool"}],"name":"setPunishProvider","outputs":[{"name":"success","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[{"name":"_dataSourceName","type":"bytes32"}],"name":"getDataProviderInfo","outputs":[{"name":"owner","type":"address"},{"name":"price","type":"uint256"},{"name":"volume","type":"uint256"},{"name":"subscriptionsNum","type":"uint256"},{"name":"isProvider","type":"bool"},{"name":"isActive","type":"bool"},{"name":"isPunished","type":"bool"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"_dataSourceName","type":"bytes32"}],"name":"subscribe","outputs":[{"name":"success","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"mProvidersSize","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"_dataSourceName","type":"bytes32"},{"name":"_newPrice","type":"uint256"}],"name":"updateDataSourcePrice","outputs":[{"name":"success","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"name":"_dataSourceName","type":"bytes32"}],"name":"withdrawProvider","outputs":[{"name":"success","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"getAllProviders","outputs":[{"name":"","type":"bytes32[]"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"getMarketplaceTotalBalance","outputs":[{"name":"totalBalance","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"_dataSourceName","type":"bytes32"}],"name":"refundSubscriber","outputs":[{"name":"success","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"name":"_dataSourceName","type":"bytes32"},{"name":"_price","type":"uint256"},{"name":"_dataOwner","type":"address"}],"name":"register","outputs":[{"name":"success","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"mCurrent","outputs":[{"name":"","type":"bytes32"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"_dataSourceName","type":"bytes32"}],"name":"getWithdrawAmount","outputs":[{"name":"withdrawAmount","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"_subscriber","type":"address"},{"name":"_dataSourceName","type":"bytes32"}],"name":"checkAddressSubscription","outputs":[{"name":"subscriber","type":"address"},{"name":"dataSourceName","type":"bytes32"},{"name":"price","type":"uint256"},{"name":"startTime","type":"uint256"},{"name":"endTime","type":"uint256"},{"name":"isUnExpired","type":"bool"},{"name":"isPaid","type":"bool"},{"name":"isPunishedProvider","type":"bool"},{"name":"isOrder","type":"bool"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"_dataSourceName","type":"bytes32"},{"name":"_isActive","type":"bool"}],"name":"changeDataSourceActivityStatus","outputs":[{"name":"success","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[{"name":"","type":"bytes32"}],"name":"mProviders","outputs":[{"name":"owner","type":"address"},{"name":"volume","type":"uint256"},{"name":"subscriptionsNum","type":"uint256"},{"name":"name","type":"bytes32"},{"name":"price","type":"uint256"},{"name":"isPunished","type":"bool"},{"name":"punishTimeStamp","type":"uint256"},{"name":"isProvider","type":"bool"},{"name":"isActive","type":"bool"},{"name":"nextProvider","type":"bytes32"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"","type":"bytes32"},{"name":"","type":"uint256"}],"name":"mOrders","outputs":[{"name":"dataSourceName","type":"bytes32"},{"name":"subscriber","type":"address"},{"name":"provider","type":"address"},{"name":"price","type":"uint256"},{"name":"startTime","type":"uint256"},{"name":"endTime","type":"uint256"},{"name":"isPaid","type":"bool"},{"name":"isOrder","type":"bool"},{"name":"isRefundPaid","type":"bool"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"mToken","outputs":[{"name":"","type":"address"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"_dataSourceName","type":"bytes32"}],"name":"getOwnerFromName","outputs":[{"name":"owner","type":"address"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"_subscriber","type":"address"},{"name":"_dataSourceName","type":"bytes32"}],"name":"getRefundAmount","outputs":[{"name":"refundAmount","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"mOwner","outputs":[{"name":"","type":"address"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"MARKETPLACE_VERSION","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"_subscriber","type":"address"},{"name":"_dataSourceName","type":"bytes32"}],"name":"isExpiredSubscription","outputs":[{"name":"isExpired","type":"bool"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"_newOwner","type":"address"}],"name":"transferOwnership","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"inputs":[{"name":"_tokenAddress","type":"address"}],"payable":false,"stateMutability":"nonpayable","type":"constructor"},{"anonymous":false,"inputs":[{"indexed":true,"name":"previousOwner","type":"address"},{"indexed":true,"name":"newOwner","type":"address"}],"name":"OwnershipTransferred","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"editor","type":"address"},{"indexed":true,"name":"dataSourceName","type":"bytes32"},{"indexed":false,"name":"newPrice","type":"uint256"}],"name":"PriceUpdate","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"editor","type":"address"},{"indexed":true,"name":"dataSourceName","type":"bytes32"},{"indexed":false,"name":"newStatus","type":"bool"}],"name":"ActivityUpdate","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"dataOwner","type":"address"},{"indexed":true,"name":"dataSourceName","type":"bytes32"},{"indexed":false,"name":"price","type":"uint256"},{"indexed":false,"name":"success","type":"bool"}],"name":"Registered","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"from","type":"address"},{"indexed":true,"name":"to","type":"address"},{"indexed":false,"name":"value","type":"uint256"}],"name":"SubscriptionDeposited","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"subscriber","type":"address"},{"indexed":true,"name":"dataSourceName","type":"bytes32"},{"indexed":true,"name":"dataOwner","type":"address"},{"indexed":false,"name":"price","type":"uint256"},{"indexed":false,"name":"success","type":"bool"}],"name":"Subscribed","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"dataOwner","type":"address"},{"indexed":true,"name":"dataSourceName","type":"bytes32"},{"indexed":false,"name":"amount","type":"uint256"}],"name":"TransferToProvider","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"dataOwner","type":"address"},{"indexed":true,"name":"dataSourceName","type":"bytes32"},{"indexed":false,"name":"amount","type":"uint256"}],"name":"ProviderWithdraw","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"dataOwner","type":"address"},{"indexed":true,"name":"dataSourceName","type":"bytes32"},{"indexed":false,"name":"isPunished","type":"bool"}],"name":"ProviderPunishStatus","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"subscriber","type":"address"},{"indexed":true,"name":"dataSourceName","type":"bytes32"},{"indexed":false,"name":"refundAmount","type":"uint256"}],"name":"SubscriberRefund","type":"event"}] \ No newline at end of file diff --git a/catalyst/marketplace/contract_marketplace_address.txt b/catalyst/marketplace/contract_marketplace_address.txt new file mode 100644 index 00000000..86810a5f --- /dev/null +++ b/catalyst/marketplace/contract_marketplace_address.txt @@ -0,0 +1 @@ +0x4fe3d3c2d8c0730d565789ddd3b63e53f342a482 \ No newline at end of file diff --git a/catalyst/marketplace/marketplace.py b/catalyst/marketplace/marketplace.py new file mode 100644 index 00000000..61ed7fec --- /dev/null +++ b/catalyst/marketplace/marketplace.py @@ -0,0 +1,673 @@ +import glob +import json +import os +import re +import shutil +import sys +import time + +import bcolz +import logbook +import pandas as pd +import requests +import six +from requests_toolbelt import MultipartDecoder +from requests_toolbelt.multipart.decoder import \ + NonMultipartContentTypeException +from web3 import Web3, HTTPProvider + +from catalyst.constants import ( + LOG_LEVEL, AUTH_SERVER, ETH_REMOTE_NODE, MARKETPLACE_CONTRACT, + MARKETPLACE_CONTRACT_ABI, ENIGMA_CONTRACT, ENIGMA_CONTRACT_ABI) +from catalyst.exchange.utils.stats_utils import set_print_settings +from catalyst.marketplace.marketplace_errors import ( + MarketplacePubAddressEmpty, MarketplaceDatasetNotFound, + MarketplaceNoAddressMatch, MarketplaceHTTPRequest, + MarketplaceNoCSVFiles, MarketplaceContractDataNoMatch, + MarketplaceSubscriptionExpired) +from catalyst.marketplace.utils.auth_utils import get_key_secret, \ + get_signed_headers +from catalyst.marketplace.utils.bundle_utils import merge_bundles +from catalyst.marketplace.utils.eth_utils import bytes32, b32_str, bin_hex +from catalyst.marketplace.utils.path_utils import get_bundle_folder, \ + get_data_source_folder, get_marketplace_folder, \ + get_user_pubaddr, get_temp_bundles_folder, extract_bundle + +if sys.version_info.major < 3: + import urllib +else: + import urllib.request as urllib + +log = logbook.Logger('Marketplace', level=LOG_LEVEL) + + +class Marketplace: + def __init__(self): + + self.addresses = get_user_pubaddr() + + if self.addresses[0]['pubAddr'] == '': + raise MarketplacePubAddressEmpty( + filename=os.path.join( + get_marketplace_folder(), 'addresses.json') + ) + self.default_account = self.addresses[0]['pubAddr'] + + self.web3 = Web3(HTTPProvider(ETH_REMOTE_NODE)) + + contract_url = urllib.urlopen(MARKETPLACE_CONTRACT) + + self.mkt_contract_address = Web3.toChecksumAddress( + contract_url.readline().strip()) + + abi_url = urllib.urlopen(MARKETPLACE_CONTRACT_ABI) + abi = json.load(abi_url) + + self.mkt_contract = self.web3.eth.contract( + self.mkt_contract_address, + abi=abi, + ) + + contract_url = urllib.urlopen(ENIGMA_CONTRACT) + + self.eng_contract_address = Web3.toChecksumAddress( + contract_url.readline().strip()) + + abi_url = urllib.urlopen(ENIGMA_CONTRACT_ABI) + abi = json.load(abi_url) + + self.eng_contract = self.web3.eth.contract( + self.eng_contract_address, + abi=abi, + ) + + # def get_data_sources_map(self): + # return [ + # dict( + # name='Marketcap', + # desc='The marketcap value in USD.', + # start_date=pd.to_datetime('2017-01-01'), + # end_date=pd.to_datetime('2018-01-15'), + # data_frequencies=['daily'], + # ), + # dict( + # name='GitHub', + # desc='The rate of development activity on GitHub.', + # start_date=pd.to_datetime('2017-01-01'), + # end_date=pd.to_datetime('2018-01-15'), + # data_frequencies=['daily', 'hour'], + # ), + # dict( + # name='Influencers', + # desc='Tweets & related sentiments by selected influencers.', + # start_date=pd.to_datetime('2017-01-01'), + # end_date=pd.to_datetime('2018-01-15'), + # data_frequencies=['daily', 'hour', 'minute'], + # ), + # ] + + def choose_pubaddr(self): + + if len(self.addresses) == 1: + address = self.addresses[0]['pubAddr'] + address_i = 0 + print('Using {} for this transaction.'.format(address)) + else: + while True: + for i in range(0, len(self.addresses)): + print('{}\t{}\t{}'.format( + i, + self.addresses[i]['pubAddr'], + self.addresses[i]['desc']) + ) + address_i = int(input('Choose your address associated with ' + 'this transaction: [default: 0] ') or 0) + if not (0 <= address_i < len(self.addresses)): + print('Please choose a number between 0 and {}\n'.format( + len(self.addresses) - 1)) + else: + address = Web3.toChecksumAddress( + self.addresses[address_i]['pubAddr']) + break + + return address, address_i + + def sign_transaction(self, from_address, tx): + + print('\nVisit https://www.myetherwallet.com/#offline-transaction and ' + 'enter the following parameters:\n\n' + 'From Address:\t\t{_from}\n' + 'To Address:\t\t{to}\n' + 'Value / Amount to Send:\t{value}\n' + 'Gas Limit:\t\t{gas}\n' + 'Nonce:\t\t\t{nonce}\n' + 'Data:\t\t\t{data}\n'.format( + _from=from_address, + to=tx['to'], + value=tx['value'], + gas=tx['gas'], + nonce=tx['nonce'], + data=tx['data'], + ) + ) + + signed_tx = input('Copy and Paste the "Signed Transaction" ' + 'field here:\n') + + if signed_tx.startswith('0x'): + signed_tx = signed_tx[2:] + + return signed_tx + + def check_transaction(self, tx_hash): + + if 'ropsten' in ETH_REMOTE_NODE: + etherscan = 'https://ropsten.etherscan.io/tx/{}'.format( + tx_hash) + else: + etherscan = 'https://etherscan.io/tx/{}'.format(tx_hash) + + print('\nYou can check the outcome of your transaction here:\n' + '{}\n\n'.format(etherscan)) + + def get_data_source_def(self, data_source_name): + data_source_name = data_source_name.lower() + dsm = self.get_data_sources_map() + + ds = six.next( + (d for d in dsm if d['name'].lower() == data_source_name), None + ) + return ds + + def list(self): + + data_sources = self.mkt_contract.functions.getAllProviders().call() + + data = [] + for index, data_source in enumerate(data_sources): + if index > 0: + data.append( + dict( + dataset=data_source.decode('utf-8').rstrip('\0') + ) + ) + + df = pd.DataFrame(data) + set_print_settings() + print(df) + + def subscribe(self, dataset): + dataset = dataset.lower() + + address = self.choose_pubaddr()[0] + provider_info = self.mkt_contract.functions.getDataProviderInfo( + bytes32(dataset) + ).call() + + if not provider_info[4]: + print('The requested "{}" dataset is not registered in ' + 'the Data Marketplace.'.format(dataset)) + return + + price = provider_info[1] + + print('\nThe price for a monthly subscription to this dataset is' + ' {} ENG'.format(price)) + + print('Checking that the ENG balance in {} is greater than ' + '{} ENG... '.format(address, price), end='') + + balance = self.web3.eth.call({ + 'from': address, + 'to': self.eng_contract_address, + 'data': '0x70a08231000000000000000000000000{}'.format( + address[2:]) + }) + try: + balance = int(balance[2:], 16) // 10 ** 8 + except ValueError: + balance = int(bin_hex(balance), 16) // 10 ** 8 + + if balance > price: + print('OK.') + else: + print('FAIL.\n\nAddress {} balance is {} ENG,\nwhich is lower ' + 'than the price of the dataset that you are trying to\n' + 'buy: {} ENG. Get enough ENG to cover the costs of the ' + 'monthly\nsubscription for what you are trying to buy, ' + 'and try again.'.format(address, balance, price)) + return + + while True: + agree_pay = input('Please confirm that you agree to pay {} ENG ' + 'for a monthly subscription to the dataset "{}" ' + 'starting today. [default: Y] '.format( + price, dataset)) or 'y' + if agree_pay.lower() not in ('y', 'n'): + print("Please answer Y or N.") + else: + if agree_pay.lower() == 'y': + break + else: + return + + print('Ready to subscribe to dataset {}.\n'.format(dataset)) + print('In order to execute the subscription, you will need to sign ' + 'two different transactions:\n' + '1. First transaction is to authorize the Marketplace contract ' + 'to spend {} ENG on your behalf.\n' + '2. Second transaction is the actual subscription for the ' + 'desired dataset'.format(price)) + + tx = self.eng_contract.functions.approve( + self.mkt_contract_address, + price, + ).buildTransaction( + {'nonce': self.web3.eth.getTransactionCount(address)}) + + if 'ropsten' in ETH_REMOTE_NODE: + tx['gas'] = min(int(tx['gas'] * 1.5), 4700000) + + signed_tx = self.sign_transaction(address, tx) + + try: + tx_hash = '0x{}'.format(bin_hex( + self.web3.eth.sendRawTransaction(signed_tx))) + print('\nThis is the TxHash for this transaction: ' + '{}'.format(tx_hash)) + + except Exception as e: + print('Unable to subscribe to data source: {}'.format(e)) + return + + self.check_transaction(tx_hash) + + print('Waiting for the first transaction to succeed...') + + while True: + try: + if self.web3.eth.getTransactionReceipt(tx_hash).status: + break + else: + print('\nTransaction failed. Aborting...') + return + except AttributeError: + pass + for i in range(0, 10): + print('.', end='', flush=True) + time.sleep(1) + + print('\nFirst transaction successful!\n' + 'Now processing second transaction.') + + tx = self.mkt_contract.functions.subscribe( + bytes32(dataset), + ).buildTransaction( + {'nonce': self.web3.eth.getTransactionCount(address)}) + + if 'ropsten' in ETH_REMOTE_NODE: + tx['gas'] = min(int(tx['gas'] * 1.5), 4700000) + + signed_tx = self.sign_transaction(address, tx) + + try: + tx_hash = '0x{}'.format(bin_hex( + self.web3.eth.sendRawTransaction(signed_tx))) + print('\nThis is the TxHash for this transaction: ' + '{}'.format(tx_hash)) + + except Exception as e: + print('Unable to subscribe to data source: {}'.format(e)) + return + + if 'ropsten' in ETH_REMOTE_NODE: + etherscan = 'https://ropsten.etherscan.io/tx/{}'.format( + tx_hash) + else: + etherscan = 'https://etherscan.io/tx/{}'.format(tx_hash) + + print('You can check the outcome of your transaction here:\n' + '{}'.format(etherscan)) + + print('Waiting for the second transaction to succeed...') + + while True: + try: + if self.web3.eth.getTransactionReceipt(tx_hash).status: + break + else: + print('\nTransaction failed. Aborting...') + return + except AttributeError: + pass + for i in range(0, 10): + print('.', end='', flush=True) + time.sleep(1) + + print('\nSecond transaction successful!\n' + 'You have successfully subscribed to dataset {} with' + 'address {}.\n' + 'You can now ingest this dataset anytime during the ' + 'next month by running the following command:\n' + 'catalyst marketplace ingest --dataset={}'.format( + dataset, address, dataset)) + + def process_temp_bundle(self, ds_name, path): + """ + Merge the temp bundle into the main bundle for the specified + data source. + + Parameters + ---------- + ds_name + path + + Returns + ------- + + """ + tmp_bundle = extract_bundle(path) + bundle_folder = get_data_source_folder(ds_name) + if os.listdir(bundle_folder): + zsource = bcolz.ctable(rootdir=tmp_bundle, mode='r') + ztarget = bcolz.ctable(rootdir=bundle_folder, mode='r') + merge_bundles(zsource, ztarget) + + else: + os.rename(tmp_bundle, bundle_folder) + + pass + + def ingest(self, ds_name, start=None, end=None, force_download=False): + + ds_name = ds_name.lower() + provider_info = self.mkt_contract.functions.getDataProviderInfo( + bytes32(ds_name) + ).call() + + if not provider_info[4]: + print('The requested "{}" dataset is not registered in ' + 'the Data Marketplace.'.format(ds_name)) + return + + address, address_i = self.choose_pubaddr() + check_sub = self.mkt_contract.functions.checkAddressSubscription( + address, bytes32(ds_name) + ).call() + + if check_sub[0] != address or b32_str(check_sub[1]) != ds_name: + raise MarketplaceContractDataNoMatch( + params='address: {}, dataset: {}'.format( + address, ds_name + ) + ) + + if not check_sub[5]: + raise MarketplaceSubscriptionExpired( + dataset=ds_name, + date=check_sub[4], + ) + + if 'key' in self.addresses[address_i]: + key = self.addresses[address_i]['key'] + secret = self.addresses[address_i]['secret'] + else: + # TODO: Verify signature to obtain key/secret pair + key, secret = get_key_secret(address, ds_name) + + headers = get_signed_headers(ds_name, key, secret) + log.debug('Starting download of dataset for ingestion...') + r = requests.post( + '{}/marketplace/ingest'.format(AUTH_SERVER), + headers=headers, + stream=True, + ) + if r.status_code == 200: + target_path = get_temp_bundles_folder() + try: + decoder = MultipartDecoder.from_response(r) + for part in decoder.parts: + h = part.headers[b'Content-Disposition'].decode('utf-8') + # Extracting the filename from the header + name = re.search(r'filename="(.*)"', h).group(1) + + filename = os.path.join(target_path, name) + with open(filename, 'wb') as f: + # for chunk in part.content.iter_content( + # chunk_size=1024): + # if chunk: # filter out keep-alive new chunks + # f.write(chunk) + f.write(part.content) + + self.process_temp_bundle(ds_name, filename) + + except NonMultipartContentTypeException: + response = r.json() + raise MarketplaceHTTPRequest( + request='ingest dataset', + error=response, + ) + else: + raise MarketplaceHTTPRequest( + request='ingest dataset', + error=r.status_code, + ) + + log.info('{} ingested successfully'.format(ds_name)) + + def get_data_source(self, data_source_name, data_frequency=None, + start=None, end=None): + data_source_name = data_source_name.lower() + + if data_frequency is None: + ds_def = self.get_data_source_def(data_source_name) + freqs = ds_def['data_frequencies'] + data_frequency = freqs[0] + + if len(freqs) > 1: + log.warn( + 'no data frequencies specified for data source {}, ' + 'selected the first one by default: {}'.format( + data_source_name, data_frequency + ) + ) + + # TODO: filter ctable by start and end date + bundle_folder = get_bundle_folder(data_source_name, data_frequency) + z = bcolz.ctable(rootdir=bundle_folder, mode='r') + + df = z.todataframe() # type: pd.DataFrame + df.set_index(['date', 'symbol'], drop=False, inplace=True) + + if start and end is None: + df = df.xs(start, level=0) + + return df + + def clean(self, data_source_name, data_frequency=None): + data_source_name = data_source_name.lower() + + if data_frequency is None: + folder = get_data_source_folder(data_source_name) + + else: + folder = get_bundle_folder(data_source_name, data_frequency) + + shutil.rmtree(folder) + pass + + def create_metadata(self, key, secret, ds_name, data_frequency, desc, + has_history=True, has_live=True): + """ + + Returns + ------- + + """ + headers = get_signed_headers(ds_name, key, secret) + r = requests.post( + '{}/marketplace/register'.format(AUTH_SERVER), + json=dict( + ds_name=ds_name, + desc=desc, + data_frequency=data_frequency, + has_history=has_history, + has_live=has_live, + ), + headers=headers, + ) + + if r.status_code != 200: + raise MarketplaceHTTPRequest( + request='register', error=r.status_code + ) + + if 'error' in r.json(): + raise MarketplaceHTTPRequest( + request='upload file', error=r.json()['error'] + ) + + def register(self): + while True: + desc = input('Enter the name of the dataset to register: ') + dataset = desc.lower() + provider_info = self.mkt_contract.functions.getDataProviderInfo( + bytes32(dataset) + ).call() + + if provider_info[4]: + print('There is already a dataset registered under ' + 'the name "{}". Please choose a different ' + 'name.'.format(dataset)) + else: + break + + price = int( + input( + 'Enter the price for a monthly subscription to ' + 'this dataset in ENG: ' + ) + ) + while True: + freq = input('Enter the data frequency [daily, hourly, minute]: ') + if freq.lower() not in ('daily', 'hourly', 'minute'): + print('Not a valid frequency.') + else: + break + + while True: + reg_pub = input( + 'Does it include historical data? [default: Y]: ' + ) or 'y' + if reg_pub.lower() not in ('y', 'n'): + print('Please answer Y or N.') + else: + if reg_pub.lower() == 'y': + has_history = True + else: + has_history = False + break + + while True: + reg_pub = input( + 'Doest it include live data? [default: Y]: ' + ) or 'y' + if reg_pub.lower() not in ('y', 'n'): + print('Please answer Y or N.') + else: + if reg_pub.lower() == 'y': + has_live = True + else: + has_live = False + break + + address, address_i = self.choose_pubaddr() + if 'key' in self.addresses[address_i]: + key = self.addresses[address_i]['key'] + secret = self.addresses[address_i]['secret'] + else: + # TODO: Verify signature to obtain key/secret pair + key, secret = get_key_secret(address, dataset) + + tx = self.mkt_contract.functions.register( + bytes32(dataset), + price, + address, + ).buildTransaction( + {'nonce': self.web3.eth.getTransactionCount(address)} + ) + + if 'ropsten' in ETH_REMOTE_NODE: + tx['gas'] = min(int(tx['gas'] * 1.5), 4700000) + + signed_tx = self.sign_transaction(address, tx) + tx_hash = '0x{}'.format( + bin_hex(self.web3.eth.sendRawTransaction(signed_tx)) + ) + print('\nThis is the TxHash for this transaction: {}'.format(tx_hash)) + + self.check_transaction(tx_hash) + + print('\nWarming up the {} dataset'.format(dataset)) + self.create_metadata( + key=key, + secret=secret, + ds_name=dataset, + data_frequency=freq, + desc=desc, + has_history=has_history, + has_live=has_live, + ) + print('\n{} registered successfully'.format(dataset)) + + def publish(self, dataset, datadir, watch): + dataset = dataset.lower() + provider_info = self.mkt_contract.functions.getDataProviderInfo( + bytes32(dataset) + ).call() + + if not provider_info[4]: + raise MarketplaceDatasetNotFound(dataset=dataset) + + match = next( + (l for l in self.addresses if l['pubAddr'] == provider_info[0]), + None + ) + if not match: + raise MarketplaceNoAddressMatch( + dataset=dataset, + address=provider_info[0]) + + print('Using address: {} to publish this dataset.'.format( + provider_info[0])) + + if 'key' in match: + key = match['key'] + secret = match['secret'] + else: + # TODO: Verify signature to obtain key/secret pair + key, secret = get_key_secret(provider_info[0], dataset) + + headers = get_signed_headers(dataset, key, secret) + filenames = glob.glob(os.path.join(datadir, '*.csv')) + + if not filenames: + raise MarketplaceNoCSVFiles(datadir=datadir) + + files = [] + for file in filenames: + files.append(('file', open(file, 'rb'))) + + r = requests.post('{}/marketplace/publish'.format(AUTH_SERVER), + files=files, + headers=headers) + + if r.status_code != 200: + raise MarketplaceHTTPRequest(request='upload file', + error=r.status_code) + + if 'error' in r.json(): + raise MarketplaceHTTPRequest(request='upload file', + error=r.json()['error']) + + print('Dataset {} uploaded successfully.'.format(dataset)) diff --git a/catalyst/marketplace/marketplace_errors.py b/catalyst/marketplace/marketplace_errors.py new file mode 100644 index 00000000..ed3d0fa4 --- /dev/null +++ b/catalyst/marketplace/marketplace_errors.py @@ -0,0 +1,68 @@ +import sys +import traceback + +from catalyst.errors import ZiplineError + + +def silent_except_hook(exctype, excvalue, exctraceback): + if exctype in [MarketplacePubAddressEmpty, MarketplaceDatasetNotFound, + MarketplaceNoAddressMatch, MarketplaceHTTPRequest, + MarketplaceNoCSVFiles, MarketplaceContractDataNoMatch, + MarketplaceSubscriptionExpired]: + fn = traceback.extract_tb(exctraceback)[-1][0] + ln = traceback.extract_tb(exctraceback)[-1][1] + print("Error traceback: {1} (line {2})\n" + "{0.__name__}: {3}".format(exctype, fn, ln, excvalue)) + else: + sys.__excepthook__(exctype, excvalue, exctraceback) + + +sys.excepthook = silent_except_hook + + +class MarketplacePubAddressEmpty(ZiplineError): + msg = ( + 'Please enter your public address to use in the Data Marketplace ' + 'in the following file: {filename}' + ).strip() + + +class MarketplaceDatasetNotFound(ZiplineError): + msg = ( + 'The dataset "{dataset}" is not registered in the Data Marketplace.' + ).strip() + + +class MarketplaceNoAddressMatch(ZiplineError): + msg = ( + 'The address registered with the dataset {dataset}: {address} ' + 'does not match any of your addresses.' + ).strip() + + +class MarketplaceHTTPRequest(ZiplineError): + msg = ( + 'Request to remote server to {request} failed: {error}' + ).strip() + + +class MarketplaceNoCSVFiles(ZiplineError): + msg = ( + 'No CSV files found on {datadir} to upload.' + ) + + +class MarketplaceContractDataNoMatch(ZiplineError): + msg = ( + 'The information found on the contract does not match the ' + 'requested data:\n{params}.' + ) + + +class MarketplaceSubscriptionExpired(ZiplineError): + msg = ( + 'Your subscription to dataset "{dataset}" expired on {date} ' + 'and is no longer active. You have to subscribe again running the ' + 'following command:\n' + 'catalyst marketplace subscribe --dataset={dataset}' + ) diff --git a/catalyst/marketplace/utils/__init__.py b/catalyst/marketplace/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/catalyst/marketplace/utils/auth_utils.py b/catalyst/marketplace/utils/auth_utils.py new file mode 100644 index 00000000..d761cf8d --- /dev/null +++ b/catalyst/marketplace/utils/auth_utils.py @@ -0,0 +1,82 @@ +import hashlib +import hmac + +import requests +import time + +from catalyst.marketplace.marketplace_errors import ( + MarketplaceHTTPRequest) +from catalyst.marketplace.utils.path_utils import ( + get_user_pubaddr, save_user_pubaddr) +from catalyst.constants import AUTH_SERVER + + +def get_key_secret(pubAddr, dataset): + """ + Obtain a new key/secret pair from authentication server + + Parameters + ---------- + pubAddr: str + dataset: str + + Returns + ------- + key: str + secret: str + + """ + session = requests.Session() + response = session.get('{}/marketplace/getkeysecret'.format(AUTH_SERVER), + headers={ + 'pubAddr': pubAddr, + 'dataset': dataset}) + + if 'error' in response.json(): + raise MarketplaceHTTPRequest(request=str('obtain key/secret'), + error=str(response.json()['error'])) + + addresses = get_user_pubaddr() + + match = next((l for l in addresses if + l['pubAddr'] == pubAddr), None) + match['key'] = response.json()['key'] + match['secret'] = response.json()['secret'] + + addresses[addresses.index(match)] = match + + save_user_pubaddr(addresses) + + return match['key'], match['secret'] + + +def get_signed_headers(ds_name, key, secret): + """ + Return a new request header including the key / secret signature + + Parameters + ---------- + ds_name + key + secret + + Returns + ------- + + """ + nonce = str(int(time.time())) + + signature = hmac.new( + secret.encode('utf-8'), + '{}{}'.format(ds_name, nonce).encode('utf-8'), + hashlib.sha512 + ).hexdigest() + + headers = { + 'Sign': signature, + 'Key': key, + 'Nonce': nonce, + 'Dataset': ds_name, + } + + return headers diff --git a/catalyst/marketplace/utils/bundle_utils.py b/catalyst/marketplace/utils/bundle_utils.py new file mode 100644 index 00000000..b58595ac --- /dev/null +++ b/catalyst/marketplace/utils/bundle_utils.py @@ -0,0 +1,36 @@ +import os +import shutil + +import bcolz + + +def merge_bundles(zsource, ztarget): + """ + Merge + Parameters + ---------- + zsource + ztarget + + Returns + ------- + + """ + # TODO: find a way to do this iteratively instead of in-memory + df_source = zsource.todataframe() + df_source.set_index('date', drop=False, inplace=True) + df_target = ztarget.todataframe() + df_target.set_index('date', drop=False, inplace=True) + + df = df_target.merge( + right=df_source, + how='right', + ) # type: pd.DataFrame + + dirname = os.path.basename(ztarget.rootdir) + bak_dir = ztarget.rootdir.replace(dirname, '.{}'.format(dirname)) + os.rename(ztarget.rootdir, bak_dir) + + z = bcolz.ctable.fromdataframe(df=df, rootdir=ztarget.rootdir) + shutil.rmtree(bak_dir) + return z diff --git a/catalyst/marketplace/utils/eth_utils.py b/catalyst/marketplace/utils/eth_utils.py new file mode 100644 index 00000000..f5fe8b7b --- /dev/null +++ b/catalyst/marketplace/utils/eth_utils.py @@ -0,0 +1,50 @@ +import binascii + + +def bytes32(string): + """ + Convert string to bytes32 data type for smart contract + + Parameters + ---------- + string: str + + Returns + ------- + list + + """ + return binascii.hexlify(string.encode('utf-8')) + + +def b32_str(bytes32): + """ + Convert bytes32 to string + + Parameters + ---------- + input: bytes object + + Returns + ------- + str + + """ + return binascii.unhexlify( + bytes32.decode('utf-8').rstrip('\0')).decode('ascii') + + +def bin_hex(binary): + """ + Convert bytes32 to string + + Parameters + ---------- + input: bytes object + + Returns + ------- + str + + """ + return binascii.hexlify(binary).decode('utf-8') diff --git a/catalyst/marketplace/utils/path_utils.py b/catalyst/marketplace/utils/path_utils.py new file mode 100644 index 00000000..6565b49d --- /dev/null +++ b/catalyst/marketplace/utils/path_utils.py @@ -0,0 +1,162 @@ +import os +import json +import tarfile + +from catalyst.utils.deprecate import deprecated +from catalyst.utils.paths import data_root, ensure_directory + + +def get_marketplace_folder(environ=None): + """ + The root path of the marketplace folder. + + Parameters + ---------- + environ: + + Returns + ------- + str + + """ + if not environ: + environ = os.environ + + root = data_root(environ) + marketplace_folder = os.path.join(root, 'marketplace') + ensure_directory(marketplace_folder) + + return marketplace_folder + + +def get_data_source_folder(data_source_name, environ=None): + """ + The root path of an data_source folder. + + Parameters + ---------- + data_source_name: str + environ: + + Returns + ------- + str + + """ + if not environ: + environ = os.environ + + root = data_root(environ) + data_source_folder = os.path.join(root, 'marketplace', data_source_name) + ensure_directory(data_source_folder) + + return data_source_folder + + +@deprecated +def get_bundle_folder(data_source_name, data_frequency, environ=None): + data_source_folder = get_data_source_folder(data_source_name, environ) + + bundle_folder = os.path.join(data_source_folder, data_frequency) + + ensure_directory(bundle_folder) + + return bundle_folder + + +def get_temp_bundles_folder(environ=None): + """ + The temp folder for bundle downloads by algo name. + + Parameters + ---------- + ds_name: str + environ: + + Returns + ------- + str + + """ + root = data_root(environ) + folder = os.path.join(root, 'marketplace', 'temp_bundles') + ensure_directory(folder) + + return folder + + +def extract_bundle(tar_filename): + """ + Extract a bcolz bundle. + + Parameters + ---------- + ds_name + + Returns + ------- + str + + """ + target_path = tar_filename.replace('.tar.gz', '') + with tarfile.open(tar_filename, 'r') as tar: + tar.extractall(target_path) + + return target_path + + +def get_user_pubaddr(environ=None): + """ + The de-serialized contend of the user's addresses.json file. + + Parameters + ---------- + environ: + + Returns + ------- + Object + + """ + marketplace_folder = get_marketplace_folder(environ) + filename = os.path.join(marketplace_folder, 'addresses.json') + + if os.path.isfile(filename): + with open(filename) as data_file: + data = json.load(data_file) + try: + d = data[0]['pubAddr'] + except Exception as e: + return [data, ] + return data + else: + data = [] + data.append(dict(pubAddr='', desc='')) + with open(filename, 'w') as f: + json.dump(data, f, sort_keys=False, indent=2, + separators=(',', ':')) + return data + + +def save_user_pubaddr(data, environ=None): + """ + Saves the user's public addresses and their related metadata in + the corresponding addresses.json file. + + Parameters + ---------- + data: dict + + Returns + ------- + True + + """ + marketplace_folder = get_marketplace_folder(environ) + filename = os.path.join(marketplace_folder, 'addresses.json') + + with open(filename, 'w') as f: + json.dump(data, f, sort_keys=False, indent=2, + separators=(',', ':')) + + return True diff --git a/etc/requirements_marketplace.txt b/etc/requirements_marketplace.txt new file mode 100644 index 00000000..2a56c200 --- /dev/null +++ b/etc/requirements_marketplace.txt @@ -0,0 +1,2 @@ +web3==4.0.0b7 +requests-toolbelt==0.8.0 diff --git a/tests/marketplace/__init__.py b/tests/marketplace/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/marketplace/test_marketplace.py b/tests/marketplace/test_marketplace.py new file mode 100644 index 00000000..d2511ce8 --- /dev/null +++ b/tests/marketplace/test_marketplace.py @@ -0,0 +1,35 @@ +from catalyst.marketplace.marketplace import Marketplace +from catalyst.testing.fixtures import WithLogger, ZiplineTestCase +import pandas as pd + + +class TestMarketplace(WithLogger, ZiplineTestCase): + def test_list(self): + marketplace = Marketplace() + marketplace.list() + pass + + def test_register(self): + marketplace = Marketplace() + marketplace.register() + pass + + def test_subscribe(self): + marketplace = Marketplace() + marketplace.subscribe('marketcap1') + pass + + def test_ingest(self): + marketplace = Marketplace() + ds_def = marketplace.ingest('marketcap1') + pass + + def test_publish(self): + marketplace = Marketplace() + datadir = '/Users/fredfortier/Downloads/marketcap_test_single' + marketplace.publish('marketcap1', datadir, False) + + def test_clean(self): + marketplace = Marketplace() + marketplace.clean('marketcap') + pass