# Copyright (c) 2019, Solitude Developers
#
# This source code is licensed under the BSD-3-Clause license found in the
# COPYING file in the root directory of this source tree
from typing import List, Dict, Tuple, Optional, Iterator, Sequence
from collections import namedtuple
import hashlib
import binascii
import bisect
from solitude.common import RPCClient
from solitude.common import ContractObjectList, hex_repr
TraceStackItem = namedtuple("TraceStackItem", ["unitname", "contractname", "decoder"])
SourceMapping = namedtuple("SourceMapping", [
"unitname", "source", "lines", "line_index", "line_start", "line_pos"])
SourceMapping.__doc__ = "Source code and line information related to an instruction"
SourceMapping.unitname.__doc__ = "source unit name"
SourceMapping.source.__doc__ = "full source text"
SourceMapping.lines.__doc__ = "full source text split in lines"
SourceMapping.line_index.__doc__ = "line index where the relevant portion begins"
SourceMapping.line_start.__doc__ = "index of the character where the line starts in the file"
SourceMapping.line_pos.__doc__ = "index of the column where the relevant portion begins (in line)"
TraceStep = namedtuple("TraceStep", [
"index", "depth", "contractname",
"pc", "op", "stack", "memory", "storage", "gas", "error",
"start", "length", "fileno", "jumptype",
"code"])
TraceStep.__doc__ = "Debugger step (instruction) information"
TraceStep.index.__doc__ = "incrementing index of the step"
TraceStep.depth.__doc__ = "call stack depth"
TraceStep.contractname.__doc__ = "contract name"
TraceStep.pc.__doc__ = "program counter"
TraceStep.op.__doc__ = "opcode string"
TraceStep.stack.__doc__ = "EVM stack as list of hex strings"
TraceStep.memory.__doc__ = "EVM memory as list of hex strings"
TraceStep.storage.__doc__ = "EVM storage as dictionary of hex strings"
TraceStep.gas.__doc__ = "Gas cost of the instruction"
TraceStep.error.__doc__ = "Error message"
TraceStep.start.__doc__ = """\
index of the character in the source file where the source code mapped to \
this instruction starts"""
TraceStep.length.__doc__ = "length of the source code mapped to this instruction"
TraceStep.fileno.__doc__ = "index which identifies the source unit"
TraceStep.jumptype.__doc__ = """\
type of jump, `'i'` for 'jump into call', `'o'`, for 'jump out of call', or empty (`''`)"""
TraceStep.code.__doc__ = """\
a SourceMapping object containing the source code and line information"""
CallStackElement = namedtuple("CallStackElement", ["prev", "step"])
CallStackElement.__doc__ = "Basic stack frame information"
CallStackElement.prev.__doc__ = "the TraceStep before entering a call"
CallStackElement.step.__doc__ = "the TraceStep after entering a call"
CallStackEvent = namedtuple("CallStackEvent", ["event", "data"])
CallStackEvent.__doc__ = "Call stack event information"
CallStackEvent.event.__doc__ = "Type of event. Can be `'push'`, `'pop'` or `None`"
CallStackEvent.data.__doc__ = "Event data. If the event is of type `'push'`, a :py:class:`CallStackElement`"
[docs]class EvmTrace:
"""Access debug information from the ETH server
"""
[docs] def __init__(self, rpc: RPCClient, contracts: ContractObjectList):
"""Create an EvmTrace instance
:param rpc: RPC client connected to the ETH server
:param contracts: a collection of contracts (see ContractObjectList)
"""
self._rpc = rpc
self._compiled = contracts
self._address_to_contract = AddressToContract()
self._address_to_contract.initialize(rpc, self._compiled)
self.srcmapper = SourceMapper(self._compiled)
[docs] def trace_iter(self, txhash: bytes) -> Iterator[Tuple[TraceStep, CallStackEvent]]:
"""Iterate contract execution steps (instructions)
:param txhash: transaction hash to inspect, as byte array
:return: generator of tuples of (TraceStep, CallStackEvent)
"""
txhash_hex = hex_repr(txhash)
transaction = self._rpc.eth_getTransactionByHash(txhash_hex)
debug_trace = self._rpc.debug_traceTransaction(txhash_hex, {})
logs = debug_trace["structLogs"]
callstack = CallStack()
tracestack = [] # type: List[TraceStackItem]
prev_depth = -1
for i, log in enumerate(logs):
depth, pc, op, error, gas, memory, stack, storage = (
log["depth"], log["pc"], log["op"], log["error"], log["gas"], log["memory"],
log["stack"], log["storage"])
# when entering call, create a new decoder for the relevant contract
if depth == prev_depth + 1: # enter CALL
if i == 0:
address = transaction["to"]
else:
# contract address is in element -2 of stack
address = "0x" + logs[i - 1]["stack"][-2][24:]
try:
call_unitname, call_contractname = self._address_to_contract.get_contract_id(address)
contract = self._compiled.contracts[(call_unitname, call_contractname)]
tracestack.append(TraceStackItem(
unitname=call_unitname,
contractname=call_contractname,
decoder=FrameDecoder(contract=contract)))
except KeyError:
tracestack.append(TraceStackItem(
unitname=call_unitname,
contractname=call_contractname,
decoder=FrameDecoderDummy()))
elif depth == prev_depth - 1:
del tracestack[-1]
prev_depth = depth
# use the relevant decoder to map source
frame = tracestack[-1]
mapping = frame.decoder.get_mapping(address=pc)
st, le, fi, ju = mapping
source = self.srcmapper.get_source(frame.unitname, st, le, fi)
step = TraceStep(
index=i, depth=depth, contractname=frame.contractname,
pc=pc, op=op, stack=stack, memory=memory, storage=storage, gas=gas, error=error,
start=st, length=le, fileno=fi, jumptype=ju,
code=source)
callstack_event = callstack.add(step)
yield step, callstack_event
class CallStack:
def __init__(self):
self._stack = [[]]
self._prev_step = None
def add(self, step: TraceStep) -> CallStackEvent:
event = None
event_data = None
prev_op = self._prev_step.op.lower() if (self._prev_step is not None) else None
op = step.op.lower()
# print("op %s prev %s" % (op, prev_op))
if op == 'jumpdest':
if prev_op == 'jump':
if (len(self._stack[-1]) > 0) and (step.pc == self._stack[-1][-1].prev.pc + 1):
del self._stack[-1][-1]
event = "pop"
elif self._prev_step.jumptype == 'i':
event_data = CallStackElement(self._prev_step, step)
event = "push"
self._stack[-1].append(event_data)
elif step.pc == 0 and prev_op == 'call':
event_data = CallStackElement(self._prev_step, step)
event = "push"
self._stack.append([event_data])
elif prev_op == 'stop':
event = "pop"
del self._stack[-1]
self._prev_step = step
return CallStackEvent(event=event, data=event_data)
@property
def stack(self):
return [x for y in self._stack for x in y]
class IFrameDecoder:
def __init__(self, contract: Optional[dict]=None):
self._contract = contract
def get_mapping(self, address: int):
raise NotImplementedError()
class FrameDecoderDummy(IFrameDecoder):
def __init__(self, contract: Optional[dict]=None):
super().__init__(contract)
def get_mapping(self, address: int) -> Tuple[int, int, int, str]:
return (0, 0, 0, '-')
class FrameDecoder(IFrameDecoder):
def __init__(self, contract: dict):
super().__init__(contract)
assert(self._contract is not None)
self._bytecode = binascii.unhexlify(self._contract["bin-runtime"])
# instruction address (bytes offset) to instruction number
# which can be related to program counter
self._address_to_instruction_number = (
FrameDecoder._map_address_to_instruction_number(self._bytecode))
# instruction address to start, length, fileid, jumptype
# to find source code parts relevant to instruction address
# The source map in the compiler output is compressed, we need to
# expand it
self._instruction_number_to_source = (
FrameDecoder._decode_source_map(self._contract["srcmap-runtime"]))
def get_mapping(self, address: int) -> Tuple[int, int, int, str]:
instruction_number = self._address_to_instruction_number[address]
mapping = self._instruction_number_to_source[instruction_number]
return mapping
@staticmethod
def _decode_source_map(srcmap: str) -> List[Tuple[int, int, int, str]]:
out = []
# the source map is a list of tuples (st, le, fi, ju)
# st: start character in source code
# le: lenght of code portion
# fi: file number for this mapping
# ju: jump type
# In the string, mappings are separated by ";" and elements of a mapping
# are separated by ":"
# from https://github.com/ethereum/solidity/blob/develop/docs/miscellaneous.rst#source-mappings
# In order to compress these source mappings especially for bytecode, the following rules are used:
# - If a field is empty, the value of the preceding element is used.
# - If a : is missing, all following fields are considered empty.
last = (0, 0, 0, '-')
for m in srcmap.split(";"):
mv = m.split(':')
st = int(mv[0]) if (len(mv) > 0 and len(mv[0])) else last[0]
le = int(mv[1]) if (len(mv) > 1 and len(mv[1])) else last[1]
fi = int(mv[2]) if (len(mv) > 2 and len(mv[2])) else last[2]
ju = mv[3] if (len(mv) > 3 and len(mv[3])) else last[3]
last = (st, le, fi, ju)
out.append(last)
return out
@staticmethod
def _map_address_to_instruction_number(bytecode) -> List[int]:
out = []
# all instructions have length 1, except PUSH1[0x60]..PUSH32[0x7f]
# which have length 2..33
instr_address = 0
k = 0
while instr_address < len(bytecode):
instr = bytecode[instr_address]
instr_length = 1
if instr >= 0x60 and instr <= 0x7f:
instr_length += instr - 0x5f
out.extend([k] * instr_length)
k += 1
instr_address += instr_length
return out
class SourcePosToLine:
def __init__(self, source: Optional[str]):
self._splits = [] # type: List[int]
if source is None:
self._source = ""
self._lines = [""]
else:
self._source = source
self._lines = source.split('\n')
for i, c in enumerate(source):
if c == '\n':
self._splits.append(i)
def line_of(self, index: int):
line_index = bisect.bisect_right(self._splits, index)
try:
line_start = self._splits[line_index - 1] + 1
except IndexError:
line_start = 0
return line_index, line_start
@property
def source(self):
return self._source
@property
def lines(self):
return self._lines
class SourceMapper:
def __init__(self, contracts: ContractObjectList):
self._compiled = contracts
# collect sources
self._unitname_to_posmapper = {} # type: Dict[str, SourcePosToLine]
self._unitname_fi_to_unitname = {} # type: Dict[Tuple[str, int], str]
# self._contractname_fi_to_unitname = {} # type: Dict[Tuple[str, int], str]
for (unitname, contractname), contract in self._compiled.contracts.items():
self._unitname_to_posmapper[unitname] = (
SourcePosToLine(contract["_solitude"]["source"]))
for fi, fi_unitname in enumerate(contract["_solitude"]["sourceList"]):
self._unitname_fi_to_unitname[(unitname, fi)] = fi_unitname
self._nullposmapper = SourcePosToLine(None)
def get_unitname(self, unitname: str, fi: int) -> str:
return self._unitname_fi_to_unitname[(unitname, fi)]
def get_source(self, unitname: str, st: int, le: int, fi: int) -> SourceMapping:
unitname_fi = None # type: Optional[str]
try:
unitname_fi = self.get_unitname(unitname, fi)
posmapper = self._unitname_to_posmapper[unitname_fi]
except KeyError:
unitname_fi = None
posmapper = self._nullposmapper
line_index, line_start = posmapper.line_of(st)
line_pos = st - line_start
if line_pos < 0:
line_pos = len(posmapper.lines[line_index])
return SourceMapping(
unitname=unitname_fi,
source=posmapper.source,
lines=posmapper.lines,
line_index=line_index,
line_start=line_start,
line_pos=line_pos)
class AddressToContract:
def __init__(self):
self._address_to_contract_id = {} # type: Dict[str, Tuple[str, str]]
def initialize(self, client: RPCClient, compiled: ContractObjectList):
earliest_block = client.eth_getBlockByNumber("earliest", False)
start_block = int(earliest_block["number"][2:], 16)
latest_block = client.eth_getBlockByNumber("latest", False)
end_block = int(latest_block["number"][2:], 16)
contracts_bin = [
(
unitname,
contractname,
binascii.unhexlify(contract["bin"])
) for (
(unitname, contractname), contract) in compiled.contracts.items()
]
for block_number in range(start_block, end_block + 1):
block = client.eth_getBlockByNumber(hex(block_number), True)
for transaction in block["transactions"]:
if len(transaction["input"]) > 4:
receipt = client.eth_getTransactionReceipt(transaction["hash"])
contract_address = receipt["contractAddress"]
if contract_address is not None:
contract_bytecode = binascii.unhexlify(transaction["input"][2:])
contract_id = self._search_contract(contracts_bin, contract_bytecode)
# print("ContractAddress: %s" % contract_address)
# print("ContractID: %s" % repr(contract_id))
self._address_to_contract_id[contract_address] = contract_id
def _search_contract(self, contracts_bin: Tuple[str, str, bytes], contract_bytecode: bytes) -> Tuple[str, str]:
match_length = 0
match_id = (None, None)
for unitname, contractname, bytecode in contracts_bin:
if len(bytecode) > match_length and contract_bytecode.startswith(bytecode):
match_length = len(bytecode)
match_id = (unitname, contractname)
return match_id
def get_contract_id(self, address: str) -> Tuple[str, str]:
return self._address_to_contract_id[address]