Skip to content

Commit

Permalink
Use new node contract in registry
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Aug 11, 2024
1 parent 31b5549 commit 47898c7
Show file tree
Hide file tree
Showing 22 changed files with 766 additions and 80 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
args: --timeout=5m --config dev/.golangci.yaml
args: --timeout=5m --config .golangci.yaml
File renamed without changes.
3 changes: 3 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ mockname: "Mock{{.InterfaceName}}"
outpkg: mocks
filename: "mock_{{.InterfaceName}}.go"
packages:
github.com/xmtp/xmtpd/pkg/registry:
interfaces:
NodesContract:
github.com/xmtp/xmtpd/pkg/indexer/blockchain:
interfaces:
ChainClient:
Expand Down
42 changes: 21 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

`xmtpd` (XMTP daemon) is an experimental version of XMTP node software. It is **not** the node software that currently forms the XMTP network.

After `xmtpd` meets specific functional requirements, the plan is for it to become the node software that powers the XMTP network.
After `xmtpd` meets specific functional requirements, the plan is for it to become the node software that powers the XMTP network.

Some of these requirements include reaching functional parity with the current node software and reliably performing data replication without data loss.
Some of these requirements include reaching functional parity with the current node software and reliably performing data replication without data loss.

To keep up with and provide feedback about `xmtpd` development, see the [Issues tab](https://github.com/xmtp/xmtpd/issues) in this repo.

Expand Down Expand Up @@ -46,7 +46,7 @@ dev/down
To start the `xmtpd` node, run:

```sh
dev/start
dev/run
```

## Test the node
Expand All @@ -57,27 +57,27 @@ To run tests against the `xmtpd` node, run:
dev/test
```

These tests provide a full suite of unit and integration tests for the `xmtpd` repo to help ensure and maintain correctness of the code over time and to avoid regressions as the code evolves. You can explore the tests by taking a look at any files with the suffix `_test.go`.
These tests provide a full suite of unit and integration tests for the `xmtpd` repo to help ensure and maintain correctness of the code over time and to avoid regressions as the code evolves. You can explore the tests by taking a look at any files with the suffix `_test.go`.

## Monitor the node

The `xmtpd` node build provides two options for monitoring your node.

- To access your local Prometheus instance to explore node metrics, run:

```sh
open http://localhost:9090
```
```sh
open http://localhost:9090
```

To learn how to query node data in Prometheus, see [Metric Types in Prometheus and PromQL](https://promlabs.com/blog/2020/09/25/metric-types-in-prometheus-and-promql) and [The Anatomy of a PromQL Query](https://promlabs.com/blog/2020/06/18/the-anatomy-of-a-promql-query/).
To learn how to query node data in Prometheus, see [Metric Types in Prometheus and PromQL](https://promlabs.com/blog/2020/09/25/metric-types-in-prometheus-and-promql) and [The Anatomy of a PromQL Query](https://promlabs.com/blog/2020/06/18/the-anatomy-of-a-promql-query/).

- To access your local Grafana instance to explore and build node dashboards, run:

```sh
open http://localhost:3000
```
```sh
open http://localhost:3000
```

To learn how to visualize node data in Grafana, see [Prometheus Histograms with Grafana Heatmaps](https://towardsdatascience.com/prometheus-histograms-with-grafana-heatmaps-d556c28612c7) and [How to visualize Prometheus histograms in Grafana](https://grafana.com/blog/2020/06/23/how-to-visualize-prometheus-histograms-in-grafana/).
To learn how to visualize node data in Grafana, see [Prometheus Histograms with Grafana Heatmaps](https://towardsdatascience.com/prometheus-histograms-with-grafana-heatmaps-d556c28612c7) and [How to visualize Prometheus histograms in Grafana](https://grafana.com/blog/2020/06/23/how-to-visualize-prometheus-histograms-in-grafana/).

# Contributing

Expand All @@ -87,24 +87,24 @@ Please follow the [style guide](https://google.github.io/styleguide/go/decisions

Submit and land a PR to https://github.com/xmtp/proto. Then run:

```sh
dev/generate
```
```sh
dev/generate
```

## Modifying the database schema

Create a new migration by running:

```sh
dev/gen-migration
```
```sh
dev/gen-migration
```

Fill in the migrations in the generated files. If you are unfamiliar with migrations, you may follow [this guide](https://github.com/golang-migrate/migrate/blob/master/MIGRATIONS.md). The database is PostgreSQL and the driver is PGX.

## Modifying database queries

We use [sqlc](https://docs.sqlc.dev/en/latest/index.html) to generate the code for our DB queries. Modify the `queries.sql` file, and then run:

```sh
sqlc generate
```
```sh
sqlc generate
```
5 changes: 3 additions & 2 deletions cmd/replication/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"syscall"

"github.com/jessevdk/go-flags"
"github.com/xmtp/xmtpd/pkg/config"
"github.com/xmtp/xmtpd/pkg/registry"
"github.com/xmtp/xmtpd/pkg/server"
"github.com/xmtp/xmtpd/pkg/tracing"
Expand All @@ -18,7 +19,7 @@ import (

var Commit string

var options server.Options
var options config.ServerOptions

func main() {
if _, err := flags.Parse(&options); err != nil {
Expand Down Expand Up @@ -81,7 +82,7 @@ func fatal(msg string, args ...any) {
log.Fatalf(msg, args...)
}

func buildLogger(options server.Options) (*zap.Logger, *zap.Config, error) {
func buildLogger(options config.ServerOptions) (*zap.Logger, *zap.Config, error) {
atom := zap.NewAtomicLevel()
level := zapcore.InfoLevel
err := level.Set(options.LogLevel)
Expand Down
1 change: 1 addition & 0 deletions dev/generate
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
set -e

go generate ./...
rm -f pkg/mocks/*
mockery
./dev/abigen

Expand Down
11 changes: 11 additions & 0 deletions dev/local.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

source dev/contracts/.env

export CHAIN_RPC_URL=$DOCKER_RPC_URL # From contracts/.env
export NODE_PRIVATE_KEY=$PRIVATE_KEY # From contracts/.env
export WRITER_CONNECTION_STRING="postgres://postgres:xmtp@localhost:8765/postgres?sslmode=disable"
NODES_CONTRACT_ADDRESS="$(jq -r '.deployedTo' build/Nodes.json)" # Built by contracts/deploy-local
export NODES_CONTRACT_ADDRESS
GROUP_MESSAGES_CONTRACT_ADDRESS="$(jq -r '.deployedTo' build/GroupMessages.json)" # Built by contracts/deploy-local
export GROUP_MESSAGES_CONTRACT_ADDRESS
12 changes: 12 additions & 0 deletions dev/run
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

set -eu

. dev/local.env

go run cmd/replication/main.go \
--db.writer-connection-string=$WRITER_CONNECTION_STRING \
--private-key=${NODE_PRIVATE_KEY} \
--contracts.nodes-address=$NODES_CONTRACT_ADDRESS \
--contracts.messages-address=$GROUP_MESSAGES_CONTRACT_ADDRESS \
--contracts.rpc-url=$CHAIN_RPC_URL
19 changes: 12 additions & 7 deletions pkg/server/options.go → pkg/config/options.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package server
package config

import (
"time"

"github.com/xmtp/xmtpd/pkg/indexer"
)

type ApiOptions struct {
Port int `short:"p" long:"port" description:"Port to listen on" default:"5050"`
}

type ContractsOptions struct {
RpcUrl string `long:"rpc-url" description:"Blockchain RPC URL"`
NodesContractAddress string `long:"nodes-address" description:"Node contract address"`
MessagesContractAddress string `long:"messages-address" description:"Message contract address"`
RefreshInterval time.Duration `long:"refresh-interval" description:"Refresh interval" default:"60s"`
}

type DbOptions struct {
ReaderConnectionString string `long:"reader-connection-string" description:"Reader connection string"`
WriterConnectionString string `long:"writer-connection-string" description:"Writer connection string" required:"true"`
Expand All @@ -19,14 +24,14 @@ type DbOptions struct {
WaitForDB time.Duration `long:"wait-for" description:"wait for DB on start, up to specified duration"`
}

type Options struct {
type ServerOptions struct {
LogLevel string `short:"l" long:"log-level" description:"Define the logging level, supported strings are: DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL, and their lower-case forms." default:"INFO"`
//nolint:staticcheck
LogEncoding string `long:"log-encoding" description:"Log encoding format. Either console or json" choice:"console" choice:"json" default:"console"`

PrivateKeyString string `long:"private-key" description:"Private key to use for the node"`

API ApiOptions `group:"API Options" namespace:"api"`
DB DbOptions `group:"Database Options" namespace:"db"`
Contracts indexer.ContractsOptions `group:"Contracts Options" namespace:"contracts"`
API ApiOptions `group:"API Options" namespace:"api"`
DB DbOptions `group:"Database Options" namespace:"db"`
Contracts ContractsOptions `group:"Contracts Options" namespace:"contracts"`
}
31 changes: 26 additions & 5 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/config"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/indexer/blockchain"
"github.com/xmtp/xmtpd/pkg/indexer/storer"
Expand All @@ -15,16 +16,31 @@ import (
)

// Start the indexer and run until the context is canceled
func StartIndexer(ctx context.Context, logger *zap.Logger, queries *queries.Queries, options ContractsOptions) error {
builder := blockchain.NewRpcLogStreamBuilder(options.RpcUrl, logger)
func StartIndexer(
ctx context.Context,
logger *zap.Logger,
queries *queries.Queries,
cfg config.ContractsOptions,
) error {
builder := blockchain.NewRpcLogStreamBuilder(cfg.RpcUrl, logger)

messagesTopic, err := buildMessagesTopic()
if err != nil {
return err
}

messagesChannel := builder.ListenForContractEvent(0, common.HexToAddress(options.MessagesContractAddress), []common.Hash{messagesTopic})
indexLogs(ctx, messagesChannel, logger.Named("indexLogs").With(zap.String("contractAddress", options.MessagesContractAddress)), storer.NewGroupMessageStorer(queries, logger))
messagesChannel := builder.ListenForContractEvent(
0,
common.HexToAddress(cfg.MessagesContractAddress),
[]common.Hash{messagesTopic},
)

indexLogs(
ctx,
messagesChannel,
logger.Named("indexLogs").With(zap.String("contractAddress", cfg.MessagesContractAddress)),
storer.NewGroupMessageStorer(queries, logger),
)

streamer, err := builder.Build()
if err != nil {
Expand All @@ -41,7 +57,12 @@ If an event fails to be stored, and the error is retryable, it will sleep for 10
The only non-retriable errors should be things like malformed events or failed validations.
*/
func indexLogs(ctx context.Context, eventChannel <-chan types.Log, logger *zap.Logger, logStorer storer.LogStorer) {
func indexLogs(
ctx context.Context,
eventChannel <-chan types.Log,
logger *zap.Logger,
logStorer storer.LogStorer,
) {
var err storer.LogStorageError
// We don't need to listen for the ctx.Done() here, since the eventChannel will be closed when the parent context is canceled
for event := range eventChannel {
Expand Down
7 changes: 0 additions & 7 deletions pkg/indexer/options.go

This file was deleted.

94 changes: 94 additions & 0 deletions pkg/mocks/mock_NodesContract.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 47898c7

Please sign in to comment.