Source code for solitude.client.eth_client

# Copyright (c) 2019, Solitude Developers
#
# This source code is licensed under the BSD-3-Clause license found in the
# COPYING file in the root directory of this source tree

from typing import Union, List, Dict, Tuple, Optional  # noqa
import binascii
import fnmatch
import time
import warnings
import itertools
from collections import namedtuple
from solitude._internal import RaiseForParam, type_assert, value_assert

# import web3 and suppress the warnings it generates
with warnings.catch_warnings():  # noqa
    warnings.simplefilter("ignore")  # noqa
    from web3 import Web3
    from web3.utils.events import get_event_data
    import web3.contract

from solitude.common.errors import SetupError
from solitude.common import (
    ContractObjectList, TransactionInfo, hex_repr, Dump)

from solitude.common import RPCClient
from solitude.client.contract import ContractBase


class EventCaptureContext:
    def __init__(self):
        super().__init__()
        self._event_filter_stack = []

    def _push_filter(self, pattern):
        self._event_filter_stack.append(pattern)

    def _pop_filter(self):
        del self._event_filter_stack[-1]

    def _check_filters(self, text: str):
        for flt in self._event_filter_stack:
            if isinstance(flt, str):
                if fnmatch.fnmatch(text, flt):
                    return True
            else:
                if flt.match(text) is not None:
                    return True
        return False


class AccountContext:
    def __init__(self):
        super().__init__()
        self._account_stack = []  # type: List[Union[str, int]]

    def _push_account(self, name: Union[str, int]):
        self._account_stack.append(name)

    def _pop_account(self):
        del self._account_stack[-1]

    def _get_account(self) -> Union[str, int]:
        try:
            return self._account_stack[-1]
        except IndexError:
            value_assert(False, "No account in current scope")


class AccountWithStatement:
    def __init__(self, ctx: AccountContext, name: Union[str, int]):
        self._ctx = ctx
        self._name = name

    def __enter__(self):
        self._ctx._push_account(self._name)
        return self

    def __exit__(self, _type, value, traceback):
        self._ctx._pop_account()


class EventCaptureWithStatement:
    def __init__(self, ctx: EventCaptureContext, flt):
        self._ctx = ctx
        self._flt = flt

    def __enter__(self):
        self._ctx._push_filter(self._flt)
        return self

    def __exit__(self, _type, value, traceback):
        self._ctx._pop_filter()


EventAbi = namedtuple("EventAbi", ["unitname", "contractname", "name", "signature", "abi"])

EventLog = namedtuple("EventLog", ["unitname", "contractname", "name", "address", "args", "data"])
EventLog.__doc__ = "Event information"
EventLog.unitname.__doc__ = "Source unit of the contract which contains the event definition"
EventLog.contractname.__doc__ = "Contract which contains the event definition"
EventLog.name.__doc__ = "Event name"
EventLog.address.__doc__ = "Address of the contract instance which produced the event"
EventLog.args.__doc__ = "Arguments of the emitted event"
EventLog.data.__doc__ = "Raw event data from web3"

Filter = namedtuple("Filter", ["index", "unitname", "contractname", "event_names", "valid"])
Filter.__doc__ = "Filter information"
Filter.index.__doc__ = "Index of the filter on the ETH node"
Filter.unitname.__doc__ = "Source unit of the contract which contains the event definition"
Filter.contractname.__doc__ = "Contract which contains the event definition"
Filter.event_names.__doc__ = "Names of the events to filter"
Filter.valid.__doc__ = "Whether the filter is still valid and it should keep being used"

ZERO_ADDRESS = hex_repr(b"", pad=20, prefix=True)


[docs]class ETHClient(AccountContext, EventCaptureContext): """ The ethereum node client object allows to communicate with an ethereum node. It is mainly used to produce contract objects which allow to interact with a contract instance on the blockchain. It stores a collection of contracts, their ABI and optionally their bytecode. """
[docs] def __init__(self, endpoint: str): """Initialize a new ETH client without any contract. :param endpoint: URL of the ethereum server node """ super().__init__() self._endpoint = endpoint self._web3 = Web3(Web3.HTTPProvider(self._endpoint)) self._rpc = RPCClient(endpoint=endpoint) self._compiled = ContractObjectList() self._dump = Dump(fileobj=None) self._default_gaslimit = None self._default_gasprice = None # accounts self._account_aliases = {} # type: Dict[str, int] self._reload_accounts() self._initial_default_account = self._web3.eth.defaultAccount # collect contracts and events self._events = [] # type: List[EventAbi] self._event_logs = [] # type: List[EventLog] self._event_map = {} # type: Dict[Tuple[str, bytes], EventAbi] self._filters = [] # type: List[Filter]
[docs] def update_contracts(self, contracts: ContractObjectList): """Update the collection of contracts known to this client :param contracts: a collection of contracts (see ContractObjectList) """ self._compiled.update(contracts) self._update_events_index(contracts)
def _update_events_index(self, compiled: ContractObjectList): for (unitname, contractname), contract in compiled.contracts.items(): for abi in contract['abi']: if abi.get("type") == "event": event_selector = "{name}({params})".format( name=abi["name"], params=",".join([inp["type"] for inp in abi["inputs"]])) event = EventAbi( unitname, contractname, name=abi["name"], signature=bytes(self._web3.sha3(text=event_selector)), abi=abi) self._events.append(event) key = (event.unitname, event.contractname, event.signature) self._event_map[key] = event def _reload_accounts(self): self._accounts = list(self._web3.eth.accounts)
[docs] def set_default_gaslimit(self, gas: Optional[int]): """Set the default gas limit for transactions :param gas: default gas limit, or None. If the gas limit is not set either through the default or explicitly in the transaction, web3 will call eth.estimateGas first to determine this value. """ self._default_gaslimit = gas
[docs] def set_default_gasprice(self, gasprice: Optional[int]): """Set the default gas price for transactions :param gasprice: default gas limit, or None. If the gas price is not set, web3 will call eth.gasPrice first to determine this value. """ self._default_gasprice = gasprice
[docs] def get_accounts(self, reload=False) -> list: """Get the accounts stored in the ETH node :param reload: whether to refresh the account list by querying the node :retrurn: list of accounts """ # TODO review the _reload_accounts implementation and also provide # the possibility to 'unlock' accounts if reload: self._reload_accounts() return self._accounts[:]
@property def rpc(self) -> RPCClient: """A raw JSON-RPC client instance to communicate with the ETH node """ return self._rpc @property def web3(self): """A raw web3 library client instance connected to the ETH node """ return self._web3 @property def contracts(self) -> ContractObjectList: """The collection of all contracts known by this client, as a ContractObjectList object """ return self._compiled
[docs] def mine_block(self) -> None: """Ask the ETH node to mine a new block """ self._rpc.evm_mine()
[docs] def increase_blocktime_offset(self, seconds: int) -> int: """Increase the offset to apply to block.timestamp for newly mined blocks :param seconds: number of seconds to add to block.timestamp offset (in seconds) :return: new block.timestamp offset (in seconds) """ response = self._rpc.evm_increaseTime(seconds) return response
[docs] def get_last_blocktime(self) -> int: """Get timestamp of last mined block :return: last block's timestamp (in seconds) """ time_hex = self._rpc.eth_getBlockByNumber('latest', True)['timestamp'] # type: str assert time_hex.startswith("0x") return int(time_hex[2:], 16)
[docs] def capture(self, pattern): r"""Enter a context which captures events emitted by transactions. :param pattern: a glob pattern string, or a regex object, to match the event name :return: a capture context The event name is in the format: ``{unitname}:{contractname}.{eventname}`` All emitted events that match are stored within the client and can be accessed with ``client.get_events()``. They are cleared before every capture. Nested captures will filter events that match any of the patterns in the context. .. code-block:: python with client.capture("*:MyToken.Transfer"): my_token_instance.transfer(address, 13) assert client.get_events()[0].args[2] == 13 with client.capture(re.compile(r".*:MyToken\.Transfer")): my_token_instance.transfer(address, 1) assert client.get_events()[0].args[2] == 1 """ return EventCaptureWithStatement(self, pattern)
def _on_transaction(self, info: TransactionInfo): # reporting self._dump("{contract}[{address}]".format( contract=info.contractname, address=info.address)) with self._dump.push(" "): self._dump("Call: {function}({fnargs}), {txargs}".format( function=info.function, fnargs=", ".join(repr(x) for x in info.fnargs), txargs=repr(info.txargs))) self._dump("Hash: 0x{txhash}".format( txhash=binascii.hexlify(info.txhash).decode())) self._dump("Cost: {gasused}".format( gasused=info.receipt.gasUsed)) # read events for log in info.receipt.logs: try: event_signature = bytes(log.topics[0]) key = (info.unitname, info.contractname, event_signature) event = self._event_map[key] except KeyError: continue match_friendly_name = event.unitname + ":" + event.contractname + "." + event.name if self._check_filters(match_friendly_name): decoded_log = self._decode_event_log(event, log) self._event_logs.append(decoded_log) def _decode_event_log(self, event: EventAbi, log) -> EventLog: data = get_event_data(event.abi, log) args = [] for inp in event.abi["inputs"]: args.append(data["args"][inp["name"]]) return EventLog( unitname=event.unitname, contractname=event.contractname, name=event.name, address=log["address"], args=args, data=data)
[docs] def account(self, address): """Enter a context which uses a specific account to perform all transactions in the context. :param address: address of the account to use :return: an account context Contexts can be nested. In this case, the account in the last context will be used. .. code-block:: python with client.account(client.address(0)): client.deploy("ContractName", args=()) """ return AccountWithStatement(self, address)
[docs] def address(self, account_id: int): """Get the address of an account in the ETH node :param account_id: index of the account in the ETH node :return: address of the account """ try: return self._accounts[account_id] except (IndexError, KeyError): value_assert(False, "%s is not a valid account" % repr(account_id))
def _push_account(self, name: Union[str, int]): super()._push_account(name) self._web3.eth.defaultAccount = self.get_current_account() def _pop_account(self): super()._pop_account() try: self._web3.eth.defaultAccount = self.get_current_account() except ValueError: self._web3.eth.defaultAccount = self._initial_default_account def _push_filter(self, pattern): if not self._event_filter_stack: self._event_logs = [] super()._push_filter(pattern)
[docs] def get_events(self) -> List[EventLog]: """Get events generated within the last capture context :return: list of event logs """ return [log for log in self._event_logs]
[docs] def clear_events(self) -> None: """Clear events generated within the last capture context """ self._event_logs = []
[docs] def get_current_account(self): """Get the account which is currently in use :return: address of the account in use """ return self._get_account()
[docs] def deploy(self, contract_selector: str, args=(), wrapper=ContractBase): """Deploy a contract :param contract_selector: contract selector string, see :py:meth:`solitude.common.ContractObjectList.select`. The contract must be present in the compiler's collection and must contain ABI and bytecode. :param args: constructor arguments :param wrapper: wrapper class for contract (see ContractBase) """ account = self.get_current_account() compiled_contract = self._compiled.select(contract_selector) unitname = compiled_contract["_solitude"]["unitName"] contractname = compiled_contract["_solitude"]["contractName"] contract = self._web3.eth.contract( abi=compiled_contract['abi'], bytecode=compiled_contract['bin']) txhash = contract.constructor(*args).transact({"from": account}) receipt = self._web3.eth.waitForTransactionReceipt(txhash) # Check whether there is any code in the deployed contract. Sometimes web3 would just produce # an empty contract after unsuccessful deployment, instead of raising an exception. code = self._web3.eth.getCode(receipt.contractAddress) if (not code) or (code == bytes([0])): raise SetupError("Error deploying contract") deployed_contract = self._web3.eth.contract( address=receipt.contractAddress, abi=compiled_contract['abi']) return wrapper(self, unitname, contractname, deployed_contract)
[docs] def use(self, contract_selector: str, address: str, wrapper=ContractBase): """Use a contract at a specific address :param contract_selector: contract selector string, see :py:meth:`solitude.common.ContractObjectList.select`. The contract must be present in the client's collection and must contain the ABI at least. :param args: constructor arguments :param account: deployer account, default is account 0 :param wrapper: wrapper class for contract (see ContractBase) """ compiled_contract = self._compiled.select(contract_selector) unitname = compiled_contract["_solitude"]["unitName"] contractname = compiled_contract["_solitude"]["contractName"] deployed_contract = self._web3.eth.contract( address=address, abi=compiled_contract['abi']) return wrapper(self, unitname, contractname, deployed_contract)
[docs] def add_filter(self, contracts: List[ContractBase], event_names: List[str], parameters=None) -> Filter: """Subscribe to events occurring on the ETH node Creates a filter on the ETH node. Returns an object with the filter information, which can be used to retrieve the events or unsubscribe. :param contracts: list of contract instances which can generate the event. All instances must refer to the same contract, possibly deployed at multiple addresses. :param event_names: names of events to listen for :param parameters: additional raw topics (optional) :return: a Filter object """ unitname = None contractname = None param_address = [] def single_or_list(lst): if len(lst) == 1: return lst[0] return lst for contract in contracts: if contractname is not None and (contract.name != contractname or contract.unitname != unitname): raise SetupError("All contract instances must refer to the same contract") contractname = contract.name unitname = contract.unitname param_address.append(contract.address) param_events = [] for event in self._events: if event.contractname == contractname and event.unitname == unitname and event.name in event_names: param_events.append(hex_repr(event.signature, prefix=True)) param_topics = [single_or_list(param_events)] if parameters is not None: param_topics += parameters params = { "fromBlock": "latest", "toBlock": "latest", "address": single_or_list(param_address), "topics": param_topics} result = self._rpc.eth_newFilter(params) assert result.startswith("0x") flt = Filter( index=int(result[2:], 16), unitname=unitname, contractname=contractname, event_names=event_names, valid=[True]) self._filters.append(flt) return flt
[docs] def remove_filter(self, flt: Filter) -> None: """Unsubscribe from previously created filter. Clean up the filter from the ETH node. :param flt: a Filter object (created by :py:meth:`EthClient.add_filter`) """ for i, saved_filter in enumerate(self._filters): if saved_filter.index == flt.index: del self._filters[i] break if flt.valid: del flt.valid[0] self._rpc.eth_uninstallFilter(hex(flt.index))
[docs] def iter_filters(self, filters: List[Filter], interval=1.0): """Iterate over events generated by a list of filters :param interval: polling interval in seconds :return: an iterator of EventLog objects """ invalid_filters = False while True: for flt in filters: if not flt.valid: invalid_filters = True continue logs = self._web3.eth.getFilterChanges(hex(flt.index)) for log in logs: key = (flt.unitname, flt.contractname, bytes(log["topics"][0])) event = self._event_map[key] decoded_log = self._decode_event_log(event, log) yield decoded_log if invalid_filters: filters = [flt for flt in filters if flt.valid] invalid_filters = False if not filters: return time.sleep(interval)
[docs] def import_raw_key(self, private_key: str, passphrase: str=""): with RaiseForParam("private_key"): value_assert( private_key.startswith("0x"), "must be a hex string prefixed with 0x") account_address = self._web3.personal.importRawKey( private_key, passphrase) if account_address not in self._accounts: self._accounts.append(account_address)
[docs] def unlock_account(self, address: str, passphrase: str="", unlock_duration: int=300): self._web3.personal.unlockAccount(address, passphrase, unlock_duration)
[docs] def miner_start(self, num_threads: int): self._web3.miner.start(num_threads)
[docs]class BatchCaller: """Utility to batch function call requests to the ETH node """
[docs] def __init__(self, client: ETHClient): """Create a BatchCaller :param client: an ETH client """ self.client = client self._data = [] # type: List[tuple] self._calls = [] # type: List[tuple]
[docs] def add_call(self, contract: ContractBase, func: str, args=()) -> None: """Add a function call to the batch call :param contract: a contract object (from :py:meth:`EthClient.deploy` or :py:meth:`EthClient.use`). :param func: function name :param args: function arguments """ contract_function = getattr(contract._contract.functions, func)(*args) self._data.append(("eth_call", [contract_function.buildTransaction()])) self._calls.append((contract, contract_function))
[docs] def execute(self) -> list: """Execute the call batch :return: a list containing the result from each function call, in the same order in which they were added. """ out = [] results = self.client.rpc.batch_call(self._data) for (contract, contract_function), result in zip(self._calls, results): output_types = web3.contract.get_abi_output_types(contract_function.abi) output_data = web3.contract.decode_abi(output_types, binascii.unhexlify(result[2:])) normalizers = itertools.chain( web3.contract.BASE_RETURN_NORMALIZERS, contract_function._return_data_normalizers) normalized_data = tuple(web3.contract.map_abi_data(normalizers, output_types, output_data)) if len(normalized_data) > 1: out.append(normalized_data) else: out.append(normalized_data[0]) return out