Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Naive IdentityUpdate storage #156

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 77 additions & 13 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,81 @@ func StartIndexer(
}
builder := blockchain.NewRpcLogStreamBuilder(client, logger)

messagesTopic, err := buildMessagesTopic()
streamer, err := configureLogStream(builder, cfg)
if err != nil {
return err
}

messagesChannel := builder.ListenForContractEvent(
0,
common.HexToAddress(cfg.MessagesContractAddress),
[]common.Hash{messagesTopic},
)

streamer, err := builder.Build()
messagesContract, err := messagesContract(cfg, client)
if err != nil {
return err
}

messagesContract, err := messagesContract(cfg, client)
go indexLogs(
ctx,
streamer.messagesChannel,
logger.Named("indexLogs").With(zap.String("contractAddress", cfg.MessagesContractAddress)),
storer.NewGroupMessageStorer(queries, logger, messagesContract),
)

identityUpdatesContract, err := identityUpdatesContract(cfg, client)
if err != nil {
return err
}

go indexLogs(
ctx,
messagesChannel,
logger.Named("indexLogs").With(zap.String("contractAddress", cfg.MessagesContractAddress)),
storer.NewGroupMessageStorer(queries, logger, messagesContract),
streamer.identityUpdatesChannel,
logger.Named("indexLogs").
With(zap.String("contractAddress", cfg.IdentityUpdatesContractAddress)),
storer.NewIdentityUpdateStorer(queries, logger, identityUpdatesContract),
)

return streamer.streamer.Start(ctx)
}

type builtStreamer struct {
streamer *blockchain.RpcLogStreamer
messagesChannel <-chan types.Log
identityUpdatesChannel <-chan types.Log
}

func configureLogStream(
builder *blockchain.RpcLogStreamBuilder,
cfg config.ContractsOptions,
) (*builtStreamer, error) {
messagesTopic, err := buildMessagesTopic()
if err != nil {
return nil, err
}

messagesChannel := builder.ListenForContractEvent(
0,
common.HexToAddress(cfg.MessagesContractAddress),
[]common.Hash{messagesTopic},
)

identityUpdatesTopic, err := buildIdentityUpdatesTopic()
if err != nil {
return nil, err
}

identityUpdatesChannel := builder.ListenForContractEvent(
0,
common.HexToAddress(cfg.IdentityUpdatesContractAddress),
[]common.Hash{identityUpdatesTopic},
)

return streamer.Start(ctx)
streamer, err := builder.Build()
if err != nil {
return nil, err
}

return &builtStreamer{
streamer: streamer,
messagesChannel: messagesChannel,
identityUpdatesChannel: identityUpdatesChannel,
}, nil
}

/*
Expand Down Expand Up @@ -103,6 +149,14 @@ func buildMessagesTopic() (common.Hash, error) {
return utils.GetEventTopic(abi, "MessageSent")
}

func buildIdentityUpdatesTopic() (common.Hash, error) {
abi, err := abis.IdentityUpdatesMetaData.GetAbi()
if err != nil {
return common.Hash{}, err
}
return utils.GetEventTopic(abi, "IdentityUpdateCreated")
}

func messagesContract(
cfg config.ContractsOptions,
client *ethclient.Client,
Expand All @@ -112,3 +166,13 @@ func messagesContract(
client,
)
}

func identityUpdatesContract(
cfg config.ContractsOptions,
client *ethclient.Client,
) (*abis.IdentityUpdates, error) {
return abis.NewIdentityUpdates(
common.HexToAddress(cfg.IdentityUpdatesContractAddress),
client,
)
}
8 changes: 6 additions & 2 deletions pkg/indexer/storer/groupMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ func NewGroupMessageStorer(
logger *zap.Logger,
contract *abis.GroupMessages,
) *GroupMessageStorer {
return &GroupMessageStorer{queries: queries, logger: logger, contract: contract}
return &GroupMessageStorer{
queries: queries,
logger: logger.Named("GroupMessageStorer"),
contract: contract,
}
}

// Validate and store a group message log event
Expand Down Expand Up @@ -52,5 +56,5 @@ func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogS

func BuildGroupMessageTopic(groupId [32]byte) string {
// We should think about simplifying the topics, since backwards compatibility shouldn't really matter here
return fmt.Sprintf("/xmtp/1/g-%x/proto", groupId)
return fmt.Sprintf("1/m/%x", groupId)
}
63 changes: 63 additions & 0 deletions pkg/indexer/storer/identityUpdate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package storer

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/core/types"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/db/queries"
"go.uber.org/zap"
)

type IdentityUpdateStorer struct {
contract *abis.IdentityUpdates
queries *queries.Queries
logger *zap.Logger
}

func NewIdentityUpdateStorer(
queries *queries.Queries,
logger *zap.Logger,
contract *abis.IdentityUpdates,
) *IdentityUpdateStorer {
return &IdentityUpdateStorer{
queries: queries,
logger: logger.Named("IdentityUpdateStorer"),
contract: contract,
}
}

// Validate and store an identity update log event
func (s *IdentityUpdateStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError {
msgSent, err := s.contract.ParseIdentityUpdateCreated(event)
if err != nil {
return NewLogStorageError(err, false)
}

// TODO:nm figure out topic structure
topic := BuildInboxTopic(msgSent.InboxId)

s.logger.Debug("Inserting identity update from contract", zap.String("topic", topic))

/**
TODO:nm validate the identity update
**/

if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{
// We may not want to hardcode this to 0 and have an originator ID for each smart contract?
OriginatorNodeID: 0,
OriginatorSequenceID: int64(msgSent.SequenceId),
Topic: []byte(topic),
OriginatorEnvelope: msgSent.Update, // TODO:nm parse originator envelope and do some validation
}); err != nil {
s.logger.Error("Error inserting envelope from smart contract", zap.Error(err))
return NewLogStorageError(err, true)
}

return nil
}

func BuildInboxTopic(inboxId [32]byte) string {
return fmt.Sprintf("1/i/%x", inboxId)
}
68 changes: 68 additions & 0 deletions pkg/indexer/storer/identityUpdate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package storer

import (
"context"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/blockchain"
"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/testutils"
)

func buildIdentityUpdateStorer(t *testing.T) (*IdentityUpdateStorer, func()) {
ctx, cancel := context.WithCancel(context.Background())
db, _, cleanup := testutils.NewDB(t, ctx)
queryImpl := queries.New(db)
config := testutils.GetContractsOptions(t)
contractAddress := config.IdentityUpdatesContractAddress

client, err := blockchain.NewClient(ctx, config.RpcUrl)
require.NoError(t, err)
contract, err := abis.NewIdentityUpdates(
common.HexToAddress(contractAddress),
client,
)

require.NoError(t, err)
storer := NewIdentityUpdateStorer(queryImpl, testutils.NewLog(t), contract)

return storer, func() {
cancel()
cleanup()
}
}

func TestStoreIdentityUpdate(t *testing.T) {
ctx := context.Background()
storer, cleanup := buildIdentityUpdateStorer(t)
defer cleanup()

// Using the RandomInboxId function, since they are both 32 bytes and we treat inbox IDs as
// strings outside the blockchain
inboxId := testutils.RandomGroupID()
message := testutils.RandomBytes(30)
sequenceID := uint64(1)

logMessage := testutils.BuildIdentityUpdateLog(t, inboxId, message, sequenceID)

err := storer.StoreLog(
ctx,
logMessage,
)
require.NoError(t, err)

envelopes, queryErr := storer.queries.SelectGatewayEnvelopes(
ctx,
queries.SelectGatewayEnvelopesParams{OriginatorNodeID: db.NullInt32(0)},
)
require.NoError(t, queryErr)

require.Equal(t, len(envelopes), 1)

firstEnvelope := envelopes[0]
require.Equal(t, firstEnvelope.OriginatorEnvelope, message)
}
30 changes: 30 additions & 0 deletions pkg/testutils/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,36 @@ func BuildMessageSentLog(
}
}

func BuildIdentityUpdateEvent(inboxId [32]byte, update []byte, sequenceID uint64) ([]byte, error) {
abi, err := abis.IdentityUpdatesMetaData.GetAbi()
if err != nil {
return nil, err
}
return abi.Events["IdentityUpdateCreated"].Inputs.Pack(inboxId, update, sequenceID)
}

// Build a log message for an IdentityUpdateCreated event
func BuildIdentityUpdateLog(
t *testing.T,
inboxId [32]byte,
update []byte,
sequenceID uint64,
) types.Log {
eventData, err := BuildIdentityUpdateEvent(inboxId, update, sequenceID)
require.NoError(t, err)

abi, err := abis.IdentityUpdatesMetaData.GetAbi()
require.NoError(t, err)

topic, err := utils.GetEventTopic(abi, "IdentityUpdateCreated")
require.NoError(t, err)

return types.Log{
Topics: []common.Hash{topic},
Data: eventData,
}
}

/*
*
Deploy a contract and return the contract's address. Will return a different address for each run, making it suitable for testing
Expand Down
Loading