diff --git a/docs/configuration.md b/docs/configuration.md
index 8e8a3bdc..faf859ff 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -88,7 +88,7 @@ Wake can be configured using configuration options loaded from multiple sources
[general]
call_trace_options = [
"contract_name", "function_name", "named_arguments", "status",
- "call_type", "value", "return_value", "error"
+ "call_type", "value", "return_value", "error", "events"
]
json_rpc_timeout = 15
link_format = "vscode://file/{path}:{line}:{col}"
@@ -214,7 +214,7 @@ Every detector supports at least the `min_confidence` and `min_impact` options:
| Option | Description |
|:----------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `call_trace_options` | What information to display in call traces. Possible options: `contract_name`, `address`, `function_name`, `named_arguments`, `arguments`, `status`, `call_type`, `value`, `gas`, `sender`, `return_value`, `error`. |
+| `call_trace_options` | What information to display in call traces. Possible options: `contract_name`, `address`, `function_name`, `named_arguments`, `arguments`, `status`, `call_type`, `value`, `gas`, `sender`, `return_value`, `error`, `events`. |
| `json_rpc_timeout` | Timeout in seconds when communicating with a node via JSON-RPC. |
| `link_format` | Format of links to source code files used in detectors and printers. The link should contain `{path}`, `{line}` and `{col}` placeholders. |
diff --git a/wake/config/data_model.py b/wake/config/data_model.py
index f5afc758..0c6cfac8 100644
--- a/wake/config/data_model.py
+++ b/wake/config/data_model.py
@@ -145,7 +145,9 @@ class SolcConfig(WakeConfigModel):
"""
Solidity files in these paths are excluded from compilation unless imported from a non-excluded file.
"""
- include_paths: FrozenSet[PurePath] = Field(default_factory=lambda: frozenset([Path.cwd() / "node_modules"]))
+ include_paths: FrozenSet[PurePath] = Field(
+ default_factory=lambda: frozenset([Path.cwd() / "node_modules"])
+ )
"""
Paths where to search for Solidity files imported using direct (non-relative) import paths.
"""
@@ -176,7 +178,9 @@ class SolcConfig(WakeConfigModel):
Metadata config options.
"""
- _normalize_paths = field_validator("allow_paths", "include_paths", "exclude_paths", mode="before")(normalize_paths)
+ _normalize_paths = field_validator(
+ "allow_paths", "include_paths", "exclude_paths", mode="before"
+ )(normalize_paths)
@field_serializer("target_version", when_used="json")
def serialize_target_version(self, version: Optional[SolidityVersion], info):
@@ -279,7 +283,9 @@ class DetectorsConfig(WakeConfigModel):
Useful for ignoring detections in dependencies.
"""
- _normalize_paths = field_validator("ignore_paths", "exclude_paths", mode="before")(normalize_paths)
+ _normalize_paths = field_validator("ignore_paths", "exclude_paths", mode="before")(
+ normalize_paths
+ )
# namespace for detector configs
@@ -434,6 +440,7 @@ class GeneralConfig(WakeConfigModel):
"value",
"return_value",
"error",
+ "events",
]
)
"""
diff --git a/wake/development/call_trace.py b/wake/development/call_trace.py
index f0f1549a..62d5e904 100644
--- a/wake/development/call_trace.py
+++ b/wake/development/call_trace.py
@@ -2,6 +2,7 @@
import reprlib
from collections import ChainMap
+from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
AbstractSet,
@@ -165,6 +166,8 @@ def _get_info_from_explorer(addr: Address, chain_id: int) -> Optional[Tuple[str,
abi_dict[eth_utils.abi.function_abi_to_4byte_selector(abi_item)] = abi_item
elif abi_item["type"] == "error":
abi_dict[eth_utils.abi.function_abi_to_4byte_selector(abi_item)] = abi_item
+ elif abi_item["type"] == "event":
+ abi_dict[eth_utils.abi.event_abi_to_log_topic(abi_item)] = abi_item
return name, abi_dict
@@ -192,46 +195,60 @@ def _decode_precompiled(
raise ValueError(f"Unknown precompiled contract address: {addr}")
-def _decode_args(
- abi, data, chain
-) -> Tuple[Optional[List], Optional[List[Optional[str]]]]:
- def normalize(arg, a):
- if a["type"] == "address":
- acc = Account(Address(arg), chain)
- if acc.label is not None:
- return acc
- else:
- return Address(arg)
- elif a["type"].endswith("]"):
+def _normalize(arg, a, chain):
+ if a["type"] == "address":
+ acc = Account(Address(arg), chain)
+ if acc.label is not None:
+ return acc
+ else:
+ return Address(arg)
+ elif a["type"].endswith("]"):
+ if "internalType" in a:
assert a["internalType"].endswith("]")
- prev_type = a["type"]
prev_internal_type = a["internalType"]
- a["type"] = "[".join(a["type"].split("[")[:-1])
a["internalType"] = "[".join(a["internalType"].split("[")[:-1])
- ret = [normalize(x, a) for x in arg]
- a["type"] = prev_type
+ prev_type = a["type"]
+ a["type"] = "[".join(a["type"].split("[")[:-1])
+
+ ret = [_normalize(x, a, chain) for x in arg]
+
+ a["type"] = prev_type
+ if "internalType" in a:
a["internalType"] = prev_internal_type
- return ret
- elif a["type"] == "uint8" and a["internalType"].startswith("enum"):
- return CustomIntEnum(a["internalType"][5:], arg)
- elif a["type"] == "tuple" and a["internalType"].startswith("struct"):
- return CustomNamedTuple(
- a["internalType"][7:],
- [c["name"] for c in a["components"]],
- *[
- normalize(arg[i], a["components"][i])
- for i in range(len(a["components"]))
- ],
- )
- else:
- return arg
+ return ret
+ elif (
+ a["type"] == "uint8"
+ and "internalType" in a
+ and a["internalType"].startswith("enum")
+ ):
+ return CustomIntEnum(a["internalType"][5:], arg)
+ elif (
+ a["type"] == "tuple"
+ and "internalType" in a
+ and a["internalType"].startswith("struct")
+ ):
+ return CustomNamedTuple(
+ a["internalType"][7:],
+ [c["name"] for c in a["components"]],
+ *[
+ _normalize(arg[i], a["components"][i], chain)
+ for i in range(len(a["components"]))
+ ],
+ )
+ else:
+ return arg
+
+
+def _decode_args(
+ abi, data, chain
+) -> Tuple[Optional[List], Optional[List[Optional[str]]]]:
input_types = [
eth_utils.abi.collapse_if_tuple(cast(Dict[str, Any], arg))
for arg in fix_library_abi(abi)
]
args = list(
- normalize(arg, type)
+ _normalize(arg, type, chain)
for arg, type in zip(eth_abi.abi.decode(input_types, data), abi)
)
arg_names = [arg["name"] for arg in abi]
@@ -239,6 +256,46 @@ def normalize(arg, a):
return args, arg_names
+def _decode_event_args(
+ abi, topics, data, chain
+) -> Tuple[Optional[List], Optional[List[Optional[str]]]]:
+ topic_index = 0
+ decoded_indexed = []
+ types = []
+
+ for arg in fix_library_abi(abi):
+ if arg["indexed"]:
+ if arg["type"] in {"string", "bytes", "tuple"} or arg["type"].endswith("]"):
+ topic_type = "bytes32"
+ else:
+ topic_type = arg["type"]
+
+ decoded_indexed.append(
+ _normalize(
+ eth_abi.abi.decode([topic_type], topics[topic_index])[0],
+ arg,
+ chain,
+ )
+ )
+ topic_index += 1
+ else:
+ types.append(eth_utils.abi.collapse_if_tuple(arg))
+
+ decoded = list(
+ _normalize(arg, type, chain)
+ for arg, type in zip(eth_abi.abi.decode(types, data), abi)
+ )
+ merged = []
+
+ for arg in abi:
+ if arg["indexed"]:
+ merged.append(decoded_indexed.pop(0))
+ else:
+ merged.append(decoded.pop(0))
+
+ return merged, [arg["name"] for arg in abi]
+
+
class CallTraceKind(StrEnum):
CALL = "CALL"
DELEGATECALL = "DELEGATECALL"
@@ -305,6 +362,13 @@ def repr_CustomNamedTuple(self, obj, level):
return f"{obj._tuple_name}({fields_str})"
+@dataclass
+class CallTraceEvent:
+ name: str
+ args: List[Any]
+ arg_names: List[Optional[str]]
+
+
class CallTrace:
_contract: Optional[Contract]
_contract_name: Optional[str]
@@ -329,8 +393,9 @@ class CallTrace:
_revert_data: Optional[bytes]
_return_value: Optional[List]
_return_names: Optional[List[Optional[str]]]
- _abi: Dict[bytes, Any] # used for error decoding
+ _abi: Dict[bytes, Any] # used for error and event decoding
_output_abi: Optional[List[Dict[str, Any]]] # used for return value decoding
+ _events: List[CallTraceEvent]
def __init__(
self,
@@ -388,6 +453,7 @@ def __init__(
"type": "error",
"inputs": [{"internalType": "uint256", "name": "code", "type": "uint256"}],
}
+ self._events = []
def __str__(self):
console = Console()
@@ -574,6 +640,28 @@ def _get_label(self) -> Text:
ret.append(", ")
ret.append(")")
+ if "events" in options:
+ for event in self._events:
+ ret.append("\n ⚡️ ")
+ ret.append_text(
+ Text.from_markup(f"[bright_yellow]{event.name}[/bright_yellow](")
+ )
+ for i, (arg, arg_name) in enumerate(zip(event.args, event.arg_names)):
+ if get_verbosity() > 0:
+ r = repr(arg)
+ else:
+ r = arg_repr.repr(arg)
+
+ if arg_name is not None and len(arg_name.strip()) > 0:
+ t = Text(f"{arg_name.strip()}={r}")
+ else:
+ t = Text(r)
+ ReprHighlighter().highlight(t)
+ ret.append_text(t)
+ if i < len(event.args) - 1:
+ ret.append(", ")
+ ret.append(")")
+
return ret
@property
@@ -604,7 +692,9 @@ def dict(self, config: WakeConfig) -> Dict[str, Union[Optional[str], List]]:
ret["contract_name"] = None
if "address" in options and self.address is not None:
- ret["address"] = Account(self.address, self.chain).label or str(self.address)
+ ret["address"] = Account(self.address, self.chain).label or str(
+ self.address
+ )
else:
ret["address"] = None
@@ -1506,5 +1596,46 @@ def from_debug_trace(
contracts.append(fqn)
values.append(value)
fqn_overrides.maps.insert(0, {})
+ elif log["op"] == "LOG0":
+ assert current_trace is not None
+ data_offset = int(log["stack"][-1], 16)
+ data_size = int(log["stack"][-2], 16)
+ data = bytes(read_from_memory(data_offset, data_size, log["memory"]))
+ event = CallTraceEvent(
+ name="UnknownEvent",
+ args=[data],
+ arg_names=["data"],
+ )
+ current_trace._events.append(event)
+ elif log["op"] in {"LOG1", "LOG2", "LOG3", "LOG4"}:
+ assert current_trace is not None
+ data_offset = int(log["stack"][-1], 16)
+ data_size = int(log["stack"][-2], 16)
+ topics_count = int(log["op"][3:])
+ topics = [
+ bytes.fromhex(log["stack"][-3 - i][2:].zfill(64))
+ for i in range(topics_count)
+ ]
+ data = bytes(read_from_memory(data_offset, data_size, log["memory"]))
+ try:
+ event_args, event_names = _decode_event_args(
+ current_trace._abi[topics[0]]["inputs"],
+ topics[1:],
+ data,
+ chain,
+ )
+ event = CallTraceEvent(
+ name=current_trace._abi[topics[0]]["name"],
+ args=event_args,
+ arg_names=event_names,
+ )
+ current_trace._events.append(event)
+ except Exception as ex:
+ event = CallTraceEvent(
+ name="UnknownEvent",
+ args=topics + [data],
+ arg_names=[f"topic{i}" for i in range(topics_count)] + ["data"],
+ )
+ current_trace._events.append(event)
return root_trace
diff --git a/wake/development/core.py b/wake/development/core.py
index 154ddcda..9562495b 100644
--- a/wake/development/core.py
+++ b/wake/development/core.py
@@ -2471,11 +2471,9 @@ def _process_events(self, tx: TransactionAbc) -> list:
for input in fix_library_abi(abi["inputs"]):
if input["indexed"]:
- if (
- input["type"] in {"string", "bytes"}
- or input["internalType"].startswith("struct ")
- or input["type"].endswith("]")
- ):
+ if input["type"] in {"string", "bytes", "tuple"} or input[
+ "type"
+ ].endswith("]"):
topic_type = "bytes32"
else:
topic_type = input["type"]