Source code for solitude.debugger.evm_debug_core

# Copyright (c) 2019, Solitude Developers
#
# This source code is licensed under the BSD-3-Clause license found in the
# COPYING file in the root directory of this source tree

from typing import Optional, Dict, List

from web3 import Web3

from solitude._internal.oi_serializable import ISerializable
from solitude._internal import EnumType
from solitude.client.eth_client import ETHClient
from solitude.debugger.evm_trace import EvmTrace, TraceStep, CallStackEvent  # noqa


class ValueKind(EnumType):
    VARIABLE = "variable"
    TEMPORARY = "temporary"
    RETURN = "return"


[docs]class Value(ISerializable): """Value debug information It represents a value associated to a named entity in the source code. Only supports numeric values. """
[docs] def __init__(self, vtype: str, name: str, value, kind: str, origin=None): """Create a Value object :param vtype: value type name :param name: value name :param value: integer content of value :param kind: one of ValueKind enum values :param origin: type of AST node from which the variable information was extracted """ self.type, self.name, self.value, self.kind, self.origin = ( vtype, name, value, kind, origin)
def __str__(self): return "{type} {name} = {value}".format( type=self.type, name=self.name if self.name else "?", value=repr(self.value_repr()))
[docs] def value_repr(self) -> str: """Get string representation of the value :return: string representation """ if self.type == "address": value = "0x{:040x}".format(self.value) if len(value) == 42: value = Web3.toChecksumAddress(value) return value else: return self.value
[docs] def to_obj(self): value = self.value_repr() return { "type": self.type, "name": self.name, "value": self.value, "kind": self.kind, "origin": self.origin, "value_string": str(value), "value_repr": repr(value), "string": str(self) }
[docs] @staticmethod def from_obj(obj): raise NotImplementedError()
[docs]class Function(ISerializable): """Object containing a function call information """
[docs] def __init__(self, name: str, parameters: List[Value]): """Create a Function object :param name: function name :param parameters: list of function parameters, as :py:class:`Value` objects """ self.name = name self.parameters = parameters
def __str__(self): return "function " + self.name + "(" + ", ".join([str(x) for x in self.parameters]) + ")"
[docs] def to_obj(self): return { "name": self.name, "parameters": [param.to_obj() for param in self.parameters], "string": str(self) }
[docs] @staticmethod def from_obj(obj): raise NotImplementedError()
[docs]class Frame(ISerializable): """Call stack frame information :ivar ~.locals: dictionary of local variable values :ivar ~.return_values: list of values produced by return statements :ivar ~.function: function call information Step information is lost during serialization, and the three attributes above are kept """
[docs] def __init__(self, prev: TraceStep, cur: TraceStep): """Create a Frame object :param prev: step before entering the function :param cur: step after entering the function """ self.prev = prev self.cur = cur self.locals = {} # type: Dict[str, Value] self.return_values = [] # type: List[Value] self.function = None # type: Optional[Function]
[docs] def to_obj(self): return { "locals": {k: v.to_obj() for k, v in self.locals.items()}, "return_values": [v.to_obj() for v in self.return_values], "function": (self.function.to_obj() if self.function is not None else None) }
[docs] @staticmethod def from_obj(obj): raise NotImplementedError()
[docs]class Step: """ Single instruction step information :ivar ~.ast: AST nodes mapped to the instruction, as dictionary of (node type name -> node dict) :ivar ~.values: values associated to this instruction (variable assignment, value produced by evaluation of statement, ...) """
[docs] def __init__(self, step: Optional[TraceStep], event: Optional[CallStackEvent]): """Create a Step object :param step: step information :param event: call stack event associated with the step This object may be create empty, with null step and event data. """ self.step = step self.event = event self.ast = {} # type: Dict[str, dict] self.values = {} # type: Dict[str, Value]
@property def valid(self): """Wether this object contains step information or is empty :return: True if not empty, otherwise False """ return self.step is not None
[docs]class EvmDebugCore: """Provides common debugger-like access to the EVM's debug information """ INVALID_STEP = Step(None, CallStackEvent(None, None))
[docs] def __init__(self, client: ETHClient, txhash: bytes, windowsize=50): """Create an EvmDebugCore. :param client: an `ETHClient` connected to the ETH node :param txhash: transaction hash, as bytes :param windowsize: amount of previous and next steps buffered, for a total of previous (windowsize) + current (1) + next (windowsize). """ self._client = client self._dbg = EvmTrace(client.rpc, client.contracts) self._txhash = txhash self._astmaps = self._create_ast_maps(client.contracts) self._windowsize = windowsize self._window_offset = 0 self._window = [EvmDebugCore.INVALID_STEP] * (1 + 2 * self._windowsize) self._frames = [] # type: List[Frame] self._iter = self._dbg.trace_iter(txhash) self._move_window(self._windowsize + 1) first_step = self._get_window_rel(0).step self._push_frame(Frame(prev=first_step, cur=first_step))
def _create_ast_maps(self, compiled): out = {} for cname, contract in compiled.contracts.items(): castmap = {} source_path = contract["_solitude"]["sourcePath"] if source_path in out: continue cast = contract["_solitude"]["ast"] nodes = [cast] while nodes: node = nodes[0] del nodes[0] if isinstance(node, dict): if "src" in node: src = node["src"] try: castmap[src].append(node) except KeyError: castmap[src] = [node] for key, value in node.items(): if isinstance(value, dict): nodes.extend(list(value.values())) elif isinstance(value, list): nodes.extend(value) elif isinstance(node, list): nodes.extend(node) out[source_path] = castmap return out def _get_window_rel(self, i): if self._windowsize + i < len(self._window): return self._window[self._windowsize + i] return EvmDebugCore.INVALID_STEP def _get_window_abs(self, i): return self._get_window_rel(i - self._window_offset) def _push_frame(self, f): self._frames.append(f) def _pop_frame(self): del self._frames[-1] def _get_frame(self, i): return self._frames[-1 - i] def _move_window(self, n): for _ in range(n): del self._window[0] try: step, event = next(self._iter) s = Step(step, event) s.ast = self._get_ast_nodes(s.step) self._window.append(s) except StopIteration: self._window.append(EvmDebugCore.INVALID_STEP) self._window_offset += 1 def _extract_variable(self, step: TraceStep, astnode: dict, stackpos: int, origin=None) -> List[Value]: vartype = astnode.get("typeDescriptions", {}).get("typeString", "T?") varname = astnode.get("name", None) varkind = ValueKind.VARIABLE if varname is None: st, le, fi = [int(x) for x in astnode["src"].split(":")] source = self._dbg.srcmapper.get_source(step.contractname, st, le, fi) varname = source.source[st:st + le] varkind = ValueKind.TEMPORARY try: varvalue = int(step.stack[-1 - stackpos], 16) except IndexError: return [] return [Value(vtype=vartype, name=varname, value=varvalue, kind=varkind, origin=origin)] def _get_ast_nodes(self, step: TraceStep): ast_src = "%d:%d:%d" % (step.start, step.length, step.fileno) out = {} try: for node in self._astmaps[step.code.unitname][ast_src]: out[node["nodeType"]] = node except KeyError: pass return out def _search_ExpressionStatement(self, s: Step, stmt: dict) -> List[Value]: values = [] try: if stmt["expression"]["nodeType"] == "Assignment": node = stmt["expression"]["leftHandSide"] values.extend( self._extract_variable(s.step, node, 0, origin="ExpressionStatement")) except KeyError: pass return values def _search_VariableDeclarationStatement(self, s: Step, stmt: dict) -> List[Value]: values = [] try: if stmt["declarations"][0]["nodeType"] == "VariableDeclaration": node = stmt["declarations"][0] values.extend( self._extract_variable(s.step, node, 0, origin="VariableDeclarationStatement")) except KeyError: pass return values def _search_FunctionDefinition(self, s: Step, stmt: dict) -> Optional[Function]: values = [] try: name = stmt["name"] params = stmt["parameters"]["parameters"] num_params = len(params) if len(s.step.stack) < num_params + 1: return None for param_index, param_node in enumerate(params): values.extend( self._extract_variable( s.step, param_node, num_params - param_index - 1, origin="FunctionDefinition")) except KeyError: return None return Function(name=name, parameters=values) def _search_FunctionReturn(self, s: Step, stmt: dict) -> Optional[Function]: values = [] try: name = stmt["name"] params = stmt["returnParameters"]["parameters"] num_params = len(params) if len(s.step.stack) < num_params + 2: return None for param_index, param_node in enumerate(params): values.extend( self._extract_variable( s.step, param_node, num_params - param_index, origin="FunctionReturn")) for var in values: var.kind = ValueKind.RETURN except KeyError: return None return Function(name=name, parameters=values)
[docs] def step(self): """Step one instruction forward """ self._move_window(1) s = self._get_window_rel(0) if not s.valid: return if s.event.event == "push": self._push_frame( Frame(prev=s.event.data.prev, cur=s.event.data.step)) elif s.event.event == "pop": self._pop_frame() try: f = self._get_frame(0) except IndexError: # TODO inline assembly delegatecall not detected and no frame being # pushed on the stack. When using proxy contracts, causes the first # frame to be missing, and the last return to be unmatched return # analyze locals ast = s.ast # print(list(ast)) # print(s.step.op) # print(s.step.stack) # print("pc=%d ast=%s op=%s" % (s.step.pc, repr(list(ast)), repr(s.step.op))) values = [] if not values and "ExpressionStatement" in ast and s.step.op == "SWAP1": values.extend( self._search_ExpressionStatement(s, ast["ExpressionStatement"])) if not values and "VariableDeclarationStatement" in ast and s.step.op in ("SWAP1", "SWAP2", "SWAP3"): values.extend( self._search_VariableDeclarationStatement(s, ast["VariableDeclarationStatement"])) if not values and "FunctionDefinition" in ast and s.step.op == "JUMP": function = self._search_FunctionReturn(s, ast["FunctionDefinition"]) if function is not None and f.function is not None and f.function.name == function.name: values.extend(function.parameters) if not values and "FunctionDefinition" in ast and s.step.op == "JUMPDEST": function = self._search_FunctionDefinition(s, ast["FunctionDefinition"]) if function is not None and f.function is None: values.extend(function.parameters) f.function = function if not values and s.step.op == "CALLVALUE": snext = self._get_window_rel(1) if snext.valid: varvalue = int(snext.step.stack[-1], 16) values.extend([ Value(vtype="uint256", name="msg.value", value=varvalue, kind=ValueKind.VARIABLE)]) for var in values: if var.kind == ValueKind.TEMPORARY: s.values[var.name] = var elif var.kind == ValueKind.VARIABLE: f.locals[var.name] = var elif var.kind == ValueKind.RETURN: f.return_values.append(var)
[docs] def get_frames(self) -> List[Frame]: """Get call stack frames :return: a list of :py:class:`Frame` """ return self._frames[::-1]
[docs] def get_callstack_depth(self) -> int: """Get the call stack depth :return: number of frames in the call stack """ return len(self._frames)
[docs] def get_values(self) -> Dict[str, Value]: """Get named values in the current step, from function parameters and local variables. :return: list of :py:class:`Value` """ s = self._get_window_rel(0) f = self._get_frame(0) out = {} # type: Dict[str, Value] out.update(f.locals) out.update(s.values) return out
[docs] def get_step(self, offset=0) -> Step: """Get step, relative to current step. :param offset: step offset, relative to the current one. Can be in range (-windowsize, windowsize), according to the windowsize value provided in the constructor. :return: a :py:class:`Step` """ return self._get_window_rel(offset)