Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge main into feat publisher refactor #91

Merged
merged 20 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ help:

build:
cd ${cmd_dir} && \
go build -v -ldflags="-X main.appVersion=$(git describe --tags --long --dirty)" -o ${binary}
go build -v -ldflags="-X main.appVersion=$(shell git describe --tags --long --dirty)" -o ${binary}

publisher_type="rabbitmq"
run: build
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,20 @@ deprecated in the future.
Using the `cmd/notifier` package as root, execute the following commands:

- install go dependencies: `go install`
- build executable: `go build -o event-notifier`
- build executable: `go build -ldflags="-X main.appVersion=$(git describe --tags --long --dirty)" -o event-notifier`
- run `./event-notifier`

Or use the build script:
```bash
bash scripts/build.sh
```

---

This can also be done using a single command from `Makefile`:
```bash
# by default, rabbitmq type
# `run` command will also trigger make `build` command
make run

# specify notifier running mode (eq: rabbitmq, ws)
Expand Down
15 changes: 10 additions & 5 deletions cmd/notifier/config/config.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
[General]
# CheckDuplicates signals if the events received from observers have been already pushed to clients
# Requires a redis instance/cluster and should be used when multiple observers push from the same shard
CheckDuplicates = true

# ExternalMarshaller is used for handling incoming/outcoming api requests
[General.ExternalMarshaller]
Type = "json"
Expand All @@ -13,10 +17,6 @@
Prefix = "erd"
Length = 32

# CheckDuplicates signals if the events received from observers have been already pushed to clients
# Requires a redis instance/cluster and should be used when multiple observers push from the same shard
CheckDuplicates = true

[WebSocketConnector]
# Enabled will determine if websocket connector will be enabled or not
Enabled = false
Expand All @@ -37,14 +37,19 @@
# Signals if in case of data payload processing error, we should send the ack signal or not
BlockingAckOnError = false

# Set to true to drop messages if there is no active WebSocket connection to send to.
DropMessagesIfNoConnection = false

# After a message will be sent it will wait for an ack message if this flag is enabled
WithAcknowledge = true

# The duration in seconds to wait for an acknowledgment message, after this time passes an error will be returned
AcknowledgeTimeoutInSec = 60

[ConnectorApi]
# Enabled will determine if http connector will be enabled or not
# Enabled will determine if http connector will be enabled or not.
# It will determine if http connector endpoints will be created.
# If set to false, the web server will still be created for other endpoints (for metrics, or for WS if needed)
Enabled = true

# The address on which the events notifier listens for subscriptions
Expand Down
273 changes: 273 additions & 0 deletions config/tomlConfig_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
package config_test

import (
"strconv"
"testing"

"github.com/multiversx/mx-chain-notifier-go/config"
"github.com/pelletier/go-toml"
"github.com/stretchr/testify/require"
)

func TestMainConfig(t *testing.T) {
t.Parallel()

generalMarshallerType := "json"
adrConverterType := "bech32"
adrConverterPrefix := "erd"
adrConverterLength := 32
checkDuplicates := true

connectorAPIHost := "5000"
connectorAPIUsername := "guest"
connectorAPIPassword := "guest"

redisURL := "redis://localhost:6379/0"
redisMasterName := "mymaster"
redisSentinelURL := "localhost:26379"
redisConnectionType := "sentinel"
redisTTL := 30

rabbitmqURL := "amqp://guest:guest@localhost:5672"
rabbitmqEventsExchangeName := "all_events"
rabbitmqEventsExchangeType := "fanout"
rabbitmqRevertExchangeName := "revert_events"
rabbitmqRevertExchangeType := "fanout"

wsConnURL := "localhost:22111"
wsConnMode := "server"
wsConnRetryDurationInSec := 5
wsConnAckTimeoutInSec := 60
wsConnMarshallerType := "gogo protobuf"

expectedConfig := config.MainConfig{
General: config.GeneralConfig{
ExternalMarshaller: config.MarshallerConfig{
Type: generalMarshallerType,
},
AddressConverter: config.AddressConverterConfig{
Type: adrConverterType,
Prefix: adrConverterPrefix,
Length: adrConverterLength,
},
CheckDuplicates: checkDuplicates,
},
WebSocketConnector: config.WebSocketConfig{
Enabled: true,
URL: wsConnURL,
Mode: wsConnMode,
RetryDurationInSec: wsConnRetryDurationInSec,
AcknowledgeTimeoutInSec: wsConnAckTimeoutInSec,
WithAcknowledge: true,
BlockingAckOnError: false,
DropMessagesIfNoConnection: false,
DataMarshallerType: wsConnMarshallerType,
},
ConnectorApi: config.ConnectorApiConfig{
Enabled: true,
Host: connectorAPIHost,
Username: connectorAPIUsername,
Password: connectorAPIPassword,
},
Redis: config.RedisConfig{
Url: redisURL,
MasterName: redisMasterName,
SentinelUrl: redisSentinelURL,
ConnectionType: redisConnectionType,
TTL: uint32(redisTTL),
},
RabbitMQ: config.RabbitMQConfig{
Url: rabbitmqURL,
EventsExchange: config.RabbitMQExchangeConfig{
Name: rabbitmqEventsExchangeName,
Type: rabbitmqEventsExchangeType,
},
RevertEventsExchange: config.RabbitMQExchangeConfig{
Name: rabbitmqRevertExchangeName,
Type: rabbitmqRevertExchangeType,
},
},
}

testString := `
[General]
# CheckDuplicates signals if the events received from observers have been already pushed to clients
# Requires a redis instance/cluster and should be used when multiple observers push from the same shard
CheckDuplicates = true

# ExternalMarshaller is used for handling incoming/outcoming api requests
[General.ExternalMarshaller]
Type = "` + generalMarshallerType + `"
# InternalMarshaller is used for handling internal structs
# This has to be mapped with the internal marshalling used for notifier outport driver
[General.InternalMarshaller]
Type = "` + generalMarshallerType + `"

# Address pubkey converter config options
[General.AddressConverter]
Type = "` + adrConverterType + `"
Prefix = "` + adrConverterPrefix + `"
Length = ` + strconv.Itoa(adrConverterLength) + `

[WebSocketConnector]
# Enabled will determine if websocket connector will be enabled or not
Enabled = true

# URL for the WebSocket client/server connection
# This value represents the IP address and port number that the WebSocket client or server will use to establish a connection.
URL = "` + wsConnURL + `"

# This flag describes the mode to start the WebSocket connector. Can be "client" or "server"
Mode = "` + wsConnMode + `"

# Possible values: json, gogo protobuf. Should be compatible with mx-chain-node outport driver config
DataMarshallerType = "` + wsConnMarshallerType + `"

# Retry duration (receive/send ack signal) in seconds
RetryDurationInSec = ` + strconv.Itoa(wsConnRetryDurationInSec) + `

# Signals if in case of data payload processing error, we should send the ack signal or not
BlockingAckOnError = false

# Set to true to drop messages if there is no active WebSocket connection to send to.
DropMessagesIfNoConnection = false

# After a message will be sent it will wait for an ack message if this flag is enabled
WithAcknowledge = true

# The duration in seconds to wait for an acknowledgment message, after this time passes an error will be returned
AcknowledgeTimeoutInSec = ` + strconv.Itoa(wsConnAckTimeoutInSec) + `

[ConnectorApi]
# Enabled will determine if http connector will be enabled or not.
# It will determine if http connector endpoints will be created.
# If set to false, the web server will still be created for other endpoints (for metrics, or for WS if needed)
Enabled = true

# The address on which the events notifier listens for subscriptions
# It can be specified as "localhost:5000" or only as "5000"
Host = "` + connectorAPIHost + `"

# Username and Password needed to authorize the connector
# BasicAuth is enabled only for the endpoints with "Auth" flag enabled
# in api.toml config file
Username = "` + connectorAPIUsername + `"
Password = "` + connectorAPIPassword + `"

[Redis]
# The url used to connect to a pubsub server
Url = "` + redisURL + `"

# The master name for failover client
MasterName = "` + redisMasterName + `"

# The sentinel url for failover client
SentinelUrl = "` + redisSentinelURL + `"

# The redis connection type. Options: | instance | sentinel |
# instance - it will try to connect to a single redis instance
# sentinel - it will try to connect to redis setup with master, slave and sentinel instances
ConnectionType = "` + redisConnectionType + `"

# Time to live (in minutes) for redis lock entry
TTL = ` + strconv.Itoa(redisTTL) + `

[RabbitMQ]
# The url used to connect to a rabbitMQ server
# Note: not required for running in the notifier mode
Url = "` + rabbitmqURL + `"

# The exchange which holds all logs and events
[RabbitMQ.EventsExchange]
Name = "` + rabbitmqEventsExchangeName + `"
Type = "` + rabbitmqEventsExchangeType + `"

# The exchange which holds revert events
[RabbitMQ.RevertEventsExchange]
Name = "` + rabbitmqRevertExchangeName + `"
Type = "` + rabbitmqRevertExchangeType + `"
`

config := config.MainConfig{}

err := toml.Unmarshal([]byte(testString), &config)
require.Nil(t, err)
require.Equal(t, expectedConfig, config)
}

func TestAPIConfig(t *testing.T) {
t.Parallel()

expectedAPIConfig := config.APIRoutesConfig{
APIPackages: map[string]config.APIPackageConfig{
"events": {
Routes: []config.RouteConfig{
{
Name: "/push",
Open: true,
Auth: false,
},
{
Name: "/revert",
Open: true,
Auth: false,
},
{
Name: "/finalized",
Open: true,
Auth: false,
},
},
},
"hub": {
Routes: []config.RouteConfig{
{
Name: "/ws",
Open: true,
},
},
},
"status": {
Routes: []config.RouteConfig{
{
Name: "/metrics",
Open: true,
},
{
Name: "/prometheus-metrics",
Open: true,
},
},
},
},
}

testString := `
# API routes configuration
[APIPackages]

[APIPackages.events]
Routes = [
{ Name = "/push", Open = true, Auth = false },
{ Name = "/revert", Open = true, Auth = false },
{ Name = "/finalized", Open = true, Auth = false },
]

[APIPackages.hub]
Routes = [
{ Name = "/ws", Open = true },
]

[APIPackages.status]
Routes = [
{ Name = "/metrics", Open = true },
{ Name = "/prometheus-metrics", Open = true },
]
`

config := config.APIRoutesConfig{}

err := toml.Unmarshal([]byte(testString), &config)
require.Nil(t, err)
require.Equal(t, expectedAPIConfig, config)
}
18 changes: 17 additions & 1 deletion data/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/multiversx/mx-chain-core-go/core"
nodeData "github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/data/alteredAccount"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-core-go/data/receipt"
"github.com/multiversx/mx-chain-core-go/data/rewardTx"
Expand Down Expand Up @@ -44,10 +45,25 @@ type ArgsSaveBlockData struct {
NumberOfShards uint32
}

// OutportBlockDataOld holds the block data that will be received on push events
// TODO: remove on next iterations, new versions will use outport driver structs from
// core repository, which will be backwards compatible from now on
type OutportBlockDataOld struct {
HeaderHash []byte
Body *block.Body
TransactionsPool *TransactionsPool
SignersIndexes []uint64
NotarizedHeadersHashes []string
HeaderGasConsumption outport.HeaderGasConsumption
AlteredAccounts map[string]*alteredAccount.AlteredAccount
NumberOfShards uint32
IsImportDB bool
}

// ArgsSaveBlock holds block data with header type
type ArgsSaveBlock struct {
HeaderType core.HeaderType
ArgsSaveBlockData
OutportBlockDataOld
}

// LogData holds the data needed for indexing logs and events
Expand Down
Loading
Loading