Skip to content

Commit

Permalink
WIP: logs
Browse files Browse the repository at this point in the history
  • Loading branch information
dwasse committed Sep 28, 2023
1 parent a77180a commit 3549fc7
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 8 deletions.
2 changes: 1 addition & 1 deletion agents/agents/executor/db/sql/base/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (s Store) GetLastBlockNumber(ctx context.Context, chainID uint32, contractT
Where(fmt.Sprintf("%s = ?", ChainIDFieldName), chainID).
Select(fmt.Sprintf("MAX(%s)", BlockNumberFieldName)).
Find(&lastBlockNumber)
case types.LightInboxContract, types.SummitContract:
case types.LightInboxContract, types.SummitContract, types.DestinationContract:
dbTx = preDBTx.Model(&Attestation{}).
Where(fmt.Sprintf("%s = ?", DestinationFieldName), chainID).
Select(fmt.Sprintf("MAX(%s)", DestinationBlockNumberFieldName)).
Expand Down
14 changes: 13 additions & 1 deletion agents/agents/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"math/big"
"reflect"
"strconv"
"time"

Expand Down Expand Up @@ -278,6 +279,10 @@ func (e Executor) Run(parentCtx context.Context) error {
return e.streamLogs(ctx, e.grpcClient, e.grpcConn, chain.ChainID, chain.LightInboxAddress, execTypes.LightInboxContract)
})

g.Go(func() error {
return e.streamLogs(ctx, e.grpcClient, e.grpcConn, chain.ChainID, chain.DestinationAddress, execTypes.DestinationContract)
})

g.Go(func() error {
return e.receiveLogs(ctx, chain.ChainID)
})
Expand Down Expand Up @@ -659,15 +664,18 @@ func (e Executor) checkIfExecuted(parentCtx context.Context, message types.Messa
//
//nolint:cyclop
func (e Executor) streamLogs(ctx context.Context, grpcClient pbscribe.ScribeServiceClient, conn *grpc.ClientConn, chainID uint32, address string, contractType execTypes.ContractType) error {
fmt.Printf("streamLogs on chain %d, address %s, contract type %s\n", chainID, address, contractType.String())
lastStoredBlock, err := e.executorDB.GetLastBlockNumber(ctx, chainID, contractType)
if err != nil {
return fmt.Errorf("could not get last stored block: %w", err)
}

fromBlock := strconv.FormatUint(lastStoredBlock, 16)

toBlock := "latest"
// toBlock := "latest"
toBlock := "1000"

fmt.Printf("stream from block %v to %v\n", fromBlock, toBlock)
stream, err := grpcClient.StreamLogs(ctx, &pbscribe.StreamLogsRequest{
Filter: &pbscribe.LogFilter{
ContractAddress: &pbscribe.NullableString{Kind: &pbscribe.NullableString_Data{Data: address}},
Expand All @@ -683,6 +691,7 @@ func (e Executor) streamLogs(ctx context.Context, grpcClient pbscribe.ScribeServ
for {
select {
case <-e.chainExecutors[chainID].closeConnection:
fmt.Println("close stream conn")
err := stream.CloseSend()
if err != nil {
return fmt.Errorf("could not close stream: %w", err)
Expand All @@ -704,6 +713,7 @@ func (e Executor) streamLogs(ctx context.Context, grpcClient pbscribe.ScribeServ
if log == nil {
return fmt.Errorf("could not convert log")
}
fmt.Printf("received raw log on chain %d, addr %s with tx hash: %v\n", chainID, address, log.TxHash)

// We do not use a span context here because this is just meant to track transactions coming in.
_, span := e.handler.Tracer().Start(ctx, "executor.streamLog", trace.WithAttributes(
Expand All @@ -729,13 +739,15 @@ func (e Executor) streamLogs(ctx context.Context, grpcClient pbscribe.ScribeServ
//
//nolint:cyclop,gocognit
func (e Executor) processLog(parentCtx context.Context, log ethTypes.Log, chainID uint32) (err error) {
fmt.Printf("processLog on chain %d with hash %s [blockNumber=%d]\n", chainID, log.TxHash.Hex(), log.BlockNumber)
datatypeInterface, err := e.logToInterface(log, chainID)
if err != nil {
return fmt.Errorf("could not convert log to interface: %w", err)
}
if datatypeInterface == nil {
return nil
}
fmt.Printf("data type: %v\n", reflect.TypeOf(datatypeInterface))

ctx, span := e.handler.Tracer().Start(parentCtx, "processLog", trace.WithAttributes(
attribute.Int(metrics.ChainID, int(chainID)),
Expand Down
9 changes: 9 additions & 0 deletions agents/agents/executor/executor_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (e Executor) logToMessage(log ethTypes.Log, chainID uint32) (types.Message,

// logToAttestation converts the log to an attestation.
func (e Executor) logToAttestation(log ethTypes.Log, chainID uint32, summitAttestation bool) (types.Attestation, error) {
fmt.Printf("logToAttestation on chain %d and log tx hash %s, summitAttestation %v\n", chainID, log.TxHash.Hex(), summitAttestation)
var attestation types.Attestation
var ok bool

Expand Down Expand Up @@ -84,6 +85,7 @@ func (e Executor) logToInterface(log ethTypes.Log, chainID uint32) (any, error)
case e.isAttestationSavedEvent(log, chainID):
return e.logToAttestation(log, chainID, true)
default:
fmt.Printf("logToInterface: unknown event type on chain %d with log tx hash %s\n", chainID, log.TxHash.Hex())
//nolint:nilnil
return nil, nil
}
Expand All @@ -108,11 +110,14 @@ func (e Executor) isSentEvent(log ethTypes.Log, chainID uint32) bool {
}

func (e Executor) isAttestationAcceptedEvent(log ethTypes.Log, chainID uint32) bool {
fmt.Printf("isAttestationAcceptedEvent on chain %d with log tx hash %s\n", chainID, log.TxHash.Hex())
fmt.Printf("lightinboxparser: %v\n", e.chainExecutors[chainID].lightInboxParser)
if e.chainExecutors[chainID].lightInboxParser == nil {
return false
}

lightManagerEvent, ok := e.chainExecutors[chainID].lightInboxParser.EventType(log)
fmt.Printf("ok: %v, lightManagerEvent: %v\n", ok, lightManagerEvent)
return ok && lightManagerEvent == lightinbox.AttestationAcceptedEvent
}

Expand All @@ -127,6 +132,7 @@ func (e Executor) isAttestationSavedEvent(log ethTypes.Log, chainID uint32) bool

// processMessage processes and stores a message.
func (e Executor) processMessage(ctx context.Context, message types.Message, logBlockNumber uint64) error {
fmt.Printf("processMessage: %v\n", message)
merkleIndex := e.chainExecutors[message.OriginDomain()].merkleTree.NumOfItems()
leaf, err := message.ToLeaf()
if err != nil {
Expand Down Expand Up @@ -154,6 +160,7 @@ func (e Executor) processMessage(ctx context.Context, message types.Message, log

// processAttestation processes and stores an attestation.
func (e Executor) processSnapshot(ctx context.Context, snapshot types.Snapshot, logBlockNumber uint64) error {
fmt.Printf("processSnapshot: %v\n", snapshot)
for _, state := range snapshot.States() {
statePayload, err := state.Encode()
if err != nil {
Expand Down Expand Up @@ -188,6 +195,7 @@ func (e Executor) processSnapshot(ctx context.Context, snapshot types.Snapshot,

// processAttestation processes and stores an attestation.
func (e Executor) processAttestation(ctx context.Context, attestation types.Attestation, chainID uint32, logBlockNumber uint64) error {
fmt.Printf("processAttestation on chain %d: %v\n", chainID, attestation)
// If the attestation is on the SynChain, we can directly use its block number and timestamp.
if chainID == e.config.SummitChainID {
err := e.executorDB.StoreAttestation(ctx, attestation, chainID, attestation.BlockNumber().Uint64(), attestation.Timestamp().Uint64())
Expand Down Expand Up @@ -231,6 +239,7 @@ retryLoop:
}
}

fmt.Printf("storing attestation: %v\n", attestation)
err = e.executorDB.StoreAttestation(ctx, attestation, chainID, logBlockNumber, logHeader.Time)
if err != nil {
return fmt.Errorf("could not store attestation: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions agents/contracts/lightinbox/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func NewParser(lightInboxAddress common.Address) (Parser, error) {
}

func (p parserImpl) EventType(log ethTypes.Log) (_ EventType, ok bool) {
fmt.Printf("parse EventType for log with hash %s\n", log.TxHash.Hex())
for _, logTopic := range log.Topics {
eventType := eventTypeFromTopic(logTopic)
if eventType == nil {
Expand All @@ -41,6 +42,7 @@ func (p parserImpl) EventType(log ethTypes.Log) (_ EventType, ok bool) {
return *eventType, true
}
// return an unknown event to avoid cases where user failed to check the event type
fmt.Println("returning unknown event")
return EventType(len(topicMap()) + 2), false
}

Expand Down
3 changes: 3 additions & 0 deletions agents/contracts/lightinbox/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lightinbox

import (
"bytes"
"fmt"
"strings"

"github.com/ethereum/go-ethereum/accounts/abi"
Expand Down Expand Up @@ -38,7 +39,9 @@ func topicMap() map[EventType]common.Hash {
// eventTypeFromTopic gets the event type from the topic
// returns nil if the topic is not found.
func eventTypeFromTopic(ogTopic common.Hash) *EventType {
fmt.Printf("eventTypeFromTopic on topic %s\n", ogTopic.Hex())
for eventType, topic := range topicMap() {
fmt.Printf("checking eventType %v and topic %v\n", eventType, topic.Hex())
if bytes.Equal(ogTopic.Bytes(), topic.Bytes()) {
return &eventType
}
Expand Down
6 changes: 0 additions & 6 deletions packages/contracts-core/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,12 @@ devnet-deploy: ## This should be run exactly once to deploy the contracts to the
cp devnet.env .env

forge script script/DeployMessaging003SynChain.s.sol --ffi -f chain_a --private-key 63e21d10fd50155dbba0e7d3f7431a400b84b4c2ac1ee38872f82448fe3ecfb9 --broadcast
sleep 1
forge script script/DeployMessaging003LightChain.s.sol --ffi -f chain_b --private-key 63e21d10fd50155dbba0e7d3f7431a400b84b4c2ac1ee38872f82448fe3ecfb9 --broadcast
sleep 1
forge script script/DeployMessaging003LightChain.s.sol --ffi -f chain_c --private-key 63e21d10fd50155dbba0e7d3f7431a400b84b4c2ac1ee38872f82448fe3ecfb9 --broadcast
sleep 1

forge script script/DeployClients003.s.sol --ffi -f chain_a --private-key 63e21d10fd50155dbba0e7d3f7431a400b84b4c2ac1ee38872f82448fe3ecfb9 --broadcast
sleep 1
forge script script/DeployClients003.s.sol --ffi -f chain_b --private-key 63e21d10fd50155dbba0e7d3f7431a400b84b4c2ac1ee38872f82448fe3ecfb9 --broadcast
sleep 1
forge script script/DeployClients003.s.sol --ffi -f chain_c --private-key 63e21d10fd50155dbba0e7d3f7431a400b84b4c2ac1ee38872f82448fe3ecfb9 --broadcast
sleep 1

devnet-logs:
cd ../../docker/devnet && docker-compose logs -f
22 changes: 22 additions & 0 deletions tools/devnet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ func main() {
if err != nil {
panic(err)
}
routes = [][2]uint32{{43, 44}, {44, 43}}
fmt.Printf("routes: %v\n", routes)

// Connect to Scribe.
Expand Down Expand Up @@ -483,6 +484,27 @@ func main() {
}
}

// time.Sleep(10 * time.Second)
// contract, ok := chainConfigs[43].Deployments[contractName].Contract.(domains.PingPongClientContract)
// if !ok {
// panic("could not cast contract")
// }
// destPingPongAddr := common.HexToAddress(chainConfigs[44].Deployments[contractName].ContractAddress)
// _, err = contract.DoPing(ctx, signer, 44, destPingPongAddr, 0)
// if err != nil {
// panic(err)
// }

// contract, ok = chainConfigs[44].Deployments[contractName].Contract.(domains.PingPongClientContract)
// if !ok {
// panic("could not cast contract")
// }
// destPingPongAddr = common.HexToAddress(chainConfigs[43].Deployments[contractName].ContractAddress)
// _, err = contract.DoPing(ctx, signer, 43, destPingPongAddr, 0)
// if err != nil {
// panic(err)
// }

err = g.Wait()
if err != nil {
panic(err)
Expand Down

0 comments on commit 3549fc7

Please sign in to comment.