diff --git a/cmd/monitoring/README.md b/cmd/monitoring/README.md index 0e055bdb5..9425aec80 100644 --- a/cmd/monitoring/README.md +++ b/cmd/monitoring/README.md @@ -24,9 +24,7 @@ An example of a compatible json encoded feeds configuration is: "contract_address_base58": "2jVYiZgQ5disuAUMxrF1LkUyhZuqvRCrx1LfB555XUUv", "transmissions_account_base58": "2jVYiZgQ5disuAUMxrF1LkUyhZuqvRCrx1LfB555XUUv", "state_account_base58": "2jVYiZgQ5disuAUMxrF1LkUyhZuqvRCrx1LfB555XUUv", - - "poll_interval_milliseconds": 1000 - }, + } { "name": "link/usd", "path": "link-usd", @@ -38,8 +36,6 @@ An example of a compatible json encoded feeds configuration is: "contract_address_base58": "GUnMZPbhxkimy9ssXyPG8rVTPBPFzL24W4vFuxyEZm66", "transmissions_account_base58": "GUnMZPbhxkimy9ssXyPG8rVTPBPFzL24W4vFuxyEZm66", "state_account_base58": "GUnMZPbhxkimy9ssXyPG8rVTPBPFzL24W4vFuxyEZm66", - - "poll_interval_milliseconds": 1000 } ] ``` @@ -47,25 +43,28 @@ An example of a compatible json encoded feeds configuration is: To build and execute the monitor locally, run: ```bash -go run ./cmd/monitoring/*.go \ --solana.rpc_endpoint="http://127.0.0.1:8899" \ --solana.network_name="solana-devnet" \ --solana.network_id="solana-devnet" \ --solana.chain_id="1" \ --kafka.config_set_topic="solana-devnet" \ --kafka.config_set_simplified_topic="solana-devnet" \ --kafka.transmission_topic="solana-devnet" \ --kafka.brokers="localhost:29092" \ --kafka.client_id="solana" \ --kafka.security_protocol="PLAINTEXT" \ --kafka.sasl_mechanism="PLAIN" \ --kafka.sasl_username="" \ --kafka.sasl_password="" \ --schema_registry.url="http://localhost:8989" \ --schema_registry.username="" \ --schema_registry.password="" \ --feeds.file_path="/tmp/feeds.json" \ --http.address="localhost:3000" +SOLANA_RPC_ENDPOINT="http://127.0.0.1:8899" \ +SOLANA_NETWORK_NAME="solana-devnet" \ +SOLANA_NETWORK_ID="solana-devnet" \ +SOLANA_CHAIN_ID="1" \ +SOLANA_READ_TIMEOUT="2s" \ +SOLANA_POLL_INTERVAL="5s" \ +KAFKA_BROKERS="localhost:29092" \ +KAFKA_CLIENT_ID="solana" \ +KAFKA_SECURITY_PROTOCOL="PLAINTEXT" \ +KAFKA_SASL_MECHANISM="PLAIN" \ +KAFKA_SASL_USERNAME="" \ +KAFKA_SASL_PASSWORD="" \ +KAFKA_CONFIG_SET_TOPIC="config_set" \ +KAFKA_CONFIG_SET_SIMPLIFIED_TOPIC="config_set_simplified" \ +KAFKA_TRANSMISSION_TOPIC="transmission_topic" \ +SCHEMA_REGISTRY_URL="http://localhost:8989" \ +SCHEMA_REGISTRY_USERNAME="" \ +SCHEMA_REGISTRY_PASSWORD="" \ +FEEDS_FILE_PATH="/tmp/feeds.json" \ +HTTP_ADDRESS="localhost:3000" \ +FEATURE_TEST_MODE=true \ +go run ./cmd/monitoring/main.go ``` See `go run ./cmd/monitoring/*.go -help` for details. diff --git a/cmd/monitoring/main.go b/cmd/monitoring/main.go index 4f477c6bb..25d4a9c76 100644 --- a/cmd/monitoring/main.go +++ b/cmd/monitoring/main.go @@ -13,6 +13,7 @@ import ( "github.com/gagliardetto/solana-go/rpc" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/smartcontractkit/chainlink-solana/pkg/monitoring" + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" "github.com/smartcontractkit/chainlink/core/logger" "go.uber.org/zap/zapcore" ) @@ -24,7 +25,7 @@ func main() { log := logger.NewLogger(loggerConfig{}) - cfg, err := monitoring.ParseConfig(bgCtx) + cfg, err := config.Parse() if err != nil { log.Fatalw("failed to parse configuration", "error", err) } @@ -51,41 +52,49 @@ func main() { client := rpc.New(cfg.Solana.RPCEndpoint) schemaRegistry := monitoring.NewSchemaRegistry(cfg.SchemaRegistry, log) - trSchema, err := schemaRegistry.EnsureSchema("transmission-value", monitoring.TransmissionAvroSchema) - + transmissionSchema, err := schemaRegistry.EnsureSchema(cfg.Kafka.TransmissionTopic+"-value", monitoring.TransmissionAvroSchema) if err != nil { log.Fatalw("failed to prepare transmission schema", "error", err) } - stSchema, err := schemaRegistry.EnsureSchema(cfg.Kafka.ConfigSetTopic+"-value", monitoring.ConfigSetAvroSchema) + configSetSchema, err := schemaRegistry.EnsureSchema(cfg.Kafka.ConfigSetTopic+"-value", monitoring.ConfigSetAvroSchema) if err != nil { log.Fatalf("failed to prepare config_set schema", "error", err) } - - csSimplifiedSchema, err := schemaRegistry.EnsureSchema(cfg.Kafka.ConfigSetSimplifiedTopic+"-value", monitoring.ConfigSetSimplifiedAvroSchema) + configSetSimplifiedSchema, err := schemaRegistry.EnsureSchema(cfg.Kafka.ConfigSetSimplifiedTopic+"-value", monitoring.ConfigSetSimplifiedAvroSchema) if err != nil { log.Fatalf("failed to prepare config_set_simplified schema", "error", err) } + producer, err := monitoring.NewProducer(bgCtx, log.With("component", "producer"), cfg.Kafka) if err != nil { log.Fatalf("failed to create kafka producer", "error", err) } - var trReader, stReader monitoring.AccountReader - if testMode, envVarPresent := os.LookupEnv("TEST_MODE"); envVarPresent && testMode == "enabled" { - trReader = monitoring.NewRandomDataReader(bgCtx, wg, "transmission", log.With("component", "rand-reader", "account", "transmissions")) - stReader = monitoring.NewRandomDataReader(bgCtx, wg, "state", log.With("component", "rand-reader", "account", "state")) + var transmissionReader, stateReader monitoring.AccountReader + if cfg.Feature.TestMode { + transmissionReader = monitoring.NewRandomDataReader(bgCtx, wg, "transmission", log.With("component", "rand-reader", "account", "transmissions")) + stateReader = monitoring.NewRandomDataReader(bgCtx, wg, "state", log.With("component", "rand-reader", "account", "state")) } else { - trReader = monitoring.NewTransmissionReader(client) - stReader = monitoring.NewStateReader(client) + transmissionReader = monitoring.NewTransmissionReader(client) + stateReader = monitoring.NewStateReader(client) } monitor := monitoring.NewMultiFeedMonitor( + cfg.Solana, + cfg.Feeds.Feeds, + log, - cfg, - trReader, stReader, - trSchema, stSchema, csSimplifiedSchema, + transmissionReader, stateReader, producer, monitoring.DefaultMetrics, + + cfg.Kafka.ConfigSetTopic, + cfg.Kafka.ConfigSetSimplifiedTopic, + cfg.Kafka.TransmissionTopic, + + configSetSchema, + configSetSimplifiedSchema, + transmissionSchema, ) wg.Add(1) go func() { diff --git a/go.mod b/go.mod index a5d3365d3..ab63f18a5 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/gagliardetto/treeout v0.1.4 github.com/golang/protobuf v1.5.2 github.com/linkedin/goavro v2.1.0+incompatible + github.com/mr-tron/base58 v1.2.0 github.com/onsi/ginkgo/v2 v2.0.0-rc2 github.com/onsi/gomega v1.17.0 github.com/pkg/errors v0.9.1 @@ -145,7 +146,6 @@ require ( github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/mostynb/zstdpool-freelist v0.0.0-20201229113212-927304c0c3b1 // indirect - github.com/mr-tron/base58 v1.2.0 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect diff --git a/pkg/monitoring/account_reader.go b/pkg/monitoring/account_reader.go index f21a23a26..a1c73fc75 100644 --- a/pkg/monitoring/account_reader.go +++ b/pkg/monitoring/account_reader.go @@ -3,15 +3,12 @@ package monitoring import ( "context" "fmt" - "time" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" pkgSolana "github.com/smartcontractkit/chainlink-solana/pkg/solana" ) -const solanaAccountReadTimeout = 2 * time.Second - // AccountReader is a wrapper on top of *rpc.Client type AccountReader interface { Read(ctx context.Context, account solana.PublicKey) (interface{}, error) @@ -31,8 +28,6 @@ type trReader struct { } func (t *trReader) Read(ctx context.Context, transmissionsAccount solana.PublicKey) (interface{}, error) { - ctx, cancel := context.WithTimeout(ctx, solanaAccountReadTimeout) - defer cancel() answer, blockNum, err := pkgSolana.GetLatestTransmission(ctx, t.client, transmissionsAccount) return TransmissionEnvelope{answer, blockNum}, err } @@ -51,9 +46,6 @@ type StateEnvelope struct { } func (s *stReader) Read(ctx context.Context, stateAccount solana.PublicKey) (interface{}, error) { - ctx, cancel := context.WithTimeout(ctx, solanaAccountReadTimeout) - defer cancel() - state, blockNum, err := pkgSolana.GetState(ctx, s.client, stateAccount) if err != nil { return nil, fmt.Errorf("failed to fetch state : %w", err) diff --git a/pkg/monitoring/schemas.go b/pkg/monitoring/avro_schemas.go similarity index 51% rename from pkg/monitoring/schemas.go rename to pkg/monitoring/avro_schemas.go index 3f71e1362..c9579271a 100644 --- a/pkg/monitoring/schemas.go +++ b/pkg/monitoring/avro_schemas.go @@ -82,6 +82,10 @@ var configSetAvroSchema = Record("config_set", Opts{Namespace: "link.chain.ocr2" Field("billing", Opts{}, Record("billing", Opts{}, Fields{ Field("observation_payment", Opts{Doc: "uint32"}, Long), })), + // These two fields (validator, flagging_threshold) have been removed from the program's + // state but they have been kept here to preserve backwards compatibility. + Field("validator", Opts{Doc: "[32]byte"}, Bytes), + Field("flagging_threshold", Opts{Doc: "uint32"}, Long), })), Field("oracles", Opts{}, Array(Record("oracle", Opts{}, Fields{ Field("transmitter", Opts{Doc: "[32]byte"}, Bytes), @@ -116,6 +120,23 @@ var configSetAvroSchema = Record("config_set", Opts{Namespace: "link.chain.ocr2" })), }) +var configSetSimplifiedAvroSchema = Record("config_set_simplified", Opts{Namespace: "link.chain.ocr2"}, Fields{ + Field("config_digest", Opts{Doc: "[32]byte encoded as base64"}, String), + Field("block_number", Opts{Doc: "uint64 big endian"}, Bytes), + Field("signers", Opts{Doc: "json encoded array of base64-encoded signing keys"}, String), + Field("transmitters", Opts{Doc: "json encoded array of base64-encoded transmission keys"}, String), + Field("f", Opts{Doc: "uint8"}, Int), + Field("delta_progress", Opts{Doc: "uint64 big endian"}, Bytes), + Field("delta_resend", Opts{Doc: "uint64 big endian"}, Bytes), + Field("delta_round", Opts{Doc: "uint64 big endian"}, Bytes), + Field("delta_grace", Opts{Doc: "uint64 big endian"}, Bytes), + Field("delta_stage", Opts{Doc: "uint64 big endian"}, Bytes), + Field("r_max", Opts{Doc: "uint32"}, Long), + Field("s", Opts{Doc: "json encoded []int"}, String), + Field("oracles", Opts{Doc: "json encoded list of oracles' "}, String), + Field("feed_state_account", Opts{Doc: "[32]byte"}, String), +}) + var transmissionAvroSchema = Record("transmission", Opts{Namespace: "link.chain.ocr2"}, Fields{ Field("block_number", Opts{Doc: "uint64 big endian"}, Bytes), Field("answer", Opts{}, Record("answer", Opts{}, Fields{ @@ -140,33 +161,16 @@ var transmissionAvroSchema = Record("transmission", Opts{Namespace: "link.chain. })), }) -var configSimplifiedSetAvroSchema = Record("config_set_simplified", Opts{Namespace: "link.chain.ocr2"}, Fields{ - Field("config_digest", Opts{}, String), - Field("block_number", Opts{Doc: "uint64 big endian"}, Bytes), - Field("signers", Opts{}, String), - Field("transmitters", Opts{}, String), - Field("f", Opts{Doc: "uint8"}, Int), - Field("delta_progress", Opts{Doc: "uint64 big endian"}, Bytes), - Field("delta_resend", Opts{Doc: "uint64 big endian"}, Bytes), - Field("delta_round", Opts{Doc: "uint64 big endian"}, Bytes), - Field("delta_grace", Opts{Doc: "uint64 big endian"}, Bytes), - Field("delta_stage", Opts{Doc: "uint64 big endian"}, Bytes), - Field("r_max", Opts{Doc: "uint32"}, Long), - Field("s", Opts{Doc: ""}, String), - Field("oracles", Opts{Doc: ""}, String), - Field("feed_state_account", Opts{Doc: ""}, String), -}) - var ( // Avro schemas to sync with the registry ConfigSetAvroSchema string - TransmissionAvroSchema string ConfigSetSimplifiedAvroSchema string + TransmissionAvroSchema string // These codecs are used in tests configSetCodec *goavro.Codec - transmissionCodec *goavro.Codec configSetSimplifiedCodec *goavro.Codec + transmissionCodec *goavro.Codec ) func init() { @@ -177,190 +181,35 @@ func init() { } ConfigSetAvroSchema = string(buf) - buf, err = json.Marshal(transmissionAvroSchema) + buf, err = json.Marshal(configSetSimplifiedAvroSchema) if err != nil { - panic(fmt.Errorf("failed to generate Avro schema for transmission: %w", err)) + panic(fmt.Errorf("failed to generate Avro schema for configSimplified: %w", err)) } - TransmissionAvroSchema = string(buf) + ConfigSetSimplifiedAvroSchema = string(buf) - buf, err = json.Marshal(configSimplifiedSetAvroSchema) + buf, err = json.Marshal(transmissionAvroSchema) if err != nil { - panic(fmt.Errorf("failed to generate Avro schema for configSimplified: %w", err)) + panic(fmt.Errorf("failed to generate Avro schema for transmission: %w", err)) } - ConfigSetSimplifiedAvroSchema = string(buf) + TransmissionAvroSchema = string(buf) configSetCodec, err = goavro.NewCodec(ConfigSetAvroSchema) if err != nil { panic(fmt.Errorf("failed to parse Avro schema for the config set: %w", err)) } - transmissionCodec, err = goavro.NewCodec(TransmissionAvroSchema) + configSetSimplifiedCodec, err = goavro.NewCodec(ConfigSetSimplifiedAvroSchema) if err != nil { - panic(fmt.Errorf("failed to parse Avro schema for the latest transmission: %w", err)) + panic(fmt.Errorf("failed to parse Avro schema for the latest configSetSimplified: %w", err)) } - configSetSimplifiedCodec, err = goavro.NewCodec(ConfigSetSimplifiedAvroSchema) + transmissionCodec, err = goavro.NewCodec(TransmissionAvroSchema) if err != nil { - panic(fmt.Errorf("failed to parse Avro schema for the latest configSetSimplified: %w", err)) + panic(fmt.Errorf("failed to parse Avro schema for the latest transmission: %w", err)) } // These codecs are used in tests but not in main, so the linter complains. _ = configSetCodec - _ = transmissionCodec _ = configSetSimplifiedCodec + _ = transmissionCodec } - -/* Keeping the original schemas here for reference. - -const ConfigSetAvroSchema = ` -{ - "namespace": "link.chain.ocr2", - "type": "record", - "name": "config_set", - "fields": [ - {"name": "block_number", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "contract_config", "type": {"type": "record", "name": "contract_config", "fields": [ - {"name": "config_digest", "type": "bytes", "doc": "[32]byte"}, - {"name": "config_count", "type": "long", "doc": "uint32"}, - {"name": "signers", "type": {"type": "array", "items": "bytes"}}, - {"name": "transmitters", "type": {"type": "array", "items": "bytes"}}, - {"name": "f", "type": "int", "doc": "uint8"}, - {"name": "onchain_config", "type": [ - {"name": "ocr2_numerical_median_onchain_config", "type": "record", "fields": [ - {"name": "min", "type": "bytes", "doc": "*big.Int"}, - {"name": "max", "type": "bytes", "doc": "*big.Int"} - ]} - ]}, - {"name": "offchain_config_version", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "offchain_config", "type": [ - {"name": "ocr2_offchain_config", "type": "record", "fields": [ - {"name": "delta_progress_nanoseconds", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "delta_resend_nanoseconds", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "delta_round_nanoseconds", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "delta_grace_nanoseconds", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "delta_stage_nanoseconds", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "r_max", "type": "long", "doc": "uint32"}, - {"name": "s", "type": {"type": "array", "items": "long"}, "doc": "[]uint32"}, - {"name": "offchain_public_keys", "type": {"type": "array", "items": "bytes"}}, - {"name": "peer_ids", "type": {"type": "array", "items": "string"}}, - {"name": "reporting_plugin_config", "type": [ - {"name": "ocr2_numerical_median_offchain_config", "type": "record", "fields": [ - {"name": "alpha_report_infinite", "type": "boolean"}, - {"name": "alpha_report_ppb", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "alpha_accept_infinite", "type": "boolean"}, - {"name": "alpha_accept_ppb", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "delta_c_nanoseconds", "type": "bytes", "doc": "uint64 big endian"} - ]} - ]}, - {"name": "max_duration_query_nanoseconds", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "max_duration_observation_nanoseconds", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "max_duration_report_nanoseconds", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "max_duration_should_accept_finalized_report_nanoseconds", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "max_duration_should_transmit_accepted_report_nanoseconds", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "shared_secret_encryptions", "type": {"type": "record", "name": "shared_secret_encryptions", "fields": [ - {"name": "diffie_hellman_point", "type": "bytes"}, - {"name": "shared_secret_hash", "type": "bytes"}, - {"name": "encryptions", "type": {"type": "array", "items": "bytes"}} - ]}} - ]} - ]} - ]}}, - {"name": "solana_program_state", "type": {"type": "record", "name": "solana_program_state", "fields": [ - {"name": "account_discriminator", "type": "bytes", "doc": "[8]byte" }, - {"name": "nonce", "type": "int" }, - {"name": "config", "type": {"type": "record", "name": "config", "fields": [ - {"name": "version", "type": "int" }, - {"name": "owner", "type": "bytes", "doc": "[32]byte" }, - {"name": "token_mint", "type": "bytes", "doc": "[32]byte" }, - {"name": "token_vault", "type": "bytes", "doc": "[32]byte" }, - {"name": "requester_access_controller", "type": "bytes", "doc": "[32]byte" }, - {"name": "billing_access_controller", "type": "bytes", "doc": "[32]byte" }, - {"name": "min_answer", "type": "bytes", "doc": "big.Int" }, - {"name": "max_answer", "type": "bytes", "doc": "big.Int" }, - {"name": "decimals", "type": "int" }, - {"name": "description", "type": "bytes", "doc": "[32]byte" }, - {"name": "f", "type": "int" }, - {"name": "config_count", "type": "int" }, - {"name": "latest_config_digest", "type": "bytes", "doc": "[32]byte" }, - {"name": "latest_config_block_number", "type": "long" }, - {"name": "latest_aggregator_round_id", "type": "int" }, - {"name": "epoch", "type": "int" }, - {"name": "round", "type": "int" }, - {"name": "billing", "type": {"type": "record", "name": "billing", "fields": [ - {"name": "observation_payment", "type": "int"} - ]}}, - {"name": "validator", "type": "bytes", "doc": "[32]byte"}, - {"name": "flagging_threshold", "type": "long"} - ]}}, - {"name": "oracles", "type": {"type": "array", "items": - {"name": "oracle", "type": "record", "fields": [ - {"name": "transmitter", "type": "bytes", "doc": "[32]byte" }, - {"name": "signer", "type": {"type": "record", "name": "signer", "fields": [ - {"name": "key", "type": "bytes", "doc": "[20]byte" } - ]}}, - {"name": "payee", "type": "bytes", "doc": "[32]byte" }, - {"name": "proposed_payee", "type": "bytes", "doc": "[32]byte" }, - {"name": "payment", "type": "long" }, - {"name": "from_round_id", "type": "int" } - ]} - }}, - {"name": "leftover_payment", "type": {"type": "array", "items": - {"name": "leftover_payment", "type": "record", "fields": [ - {"name": "payee", "type": "bytes", "doc": "[32]byte" }, - {"name": "amount", "type": "long" } - ]} - }}, - {"name": "leftover_payment_len", "type": "int" }, - {"name": "transmissions", "type": "bytes", "doc": "[32]byte" } - ]}}, - {"name": "solana_chain_config", "type": {"type": "record", "name": "solana_chain_config", "fields": [ - {"name": "network_name", "type": "string"}, - {"name": "network_id", "type": "string"}, - {"name": "chain_id", "type": "string"} - ]}}, - {"name": "feed_config", "type": {"type": "record", "name": "feed_config", "fields": [ - {"name": "feed_name", "type": "string"}, - {"name": "feed_path", "type": "string"}, - {"name": "symbol", "type": "string"}, - {"name": "heartbeat_sec", "type": "long"}, - {"name": "contract_type", "type": "string"}, - {"name": "contract_status", "type": "string"}, - {"name": "contract_address", "type": "bytes", "doc": "[32]byte"}, - {"name": "transmissions_account", "type": "bytes", "doc": "[32]byte"}, - {"name": "state_account", "type": "bytes", "doc": "[32]byte"} - ]}} - ] -} -` - -const TransmissionAvroSchema = ` -{ - "namespace": "link.chain.ocr2", - "type": "record", - "name": "transmission", - "fields": [ - {"name": "block_number", "type": "bytes", "doc": "uint64 big endian"}, - {"name": "answer", "type": {"type": "record", "name": "answer", "fields": [ - {"name": "data", "type": "bytes", "doc": "*big.Int"}, - {"name": "timestamp", "type": "long", "doc": "uint32"} - ]}}, - {"name": "solana_chain_config", "type": {"type": "record", "name": "solana_chain_config", "fields": [ - {"name": "network_name", "type": "string"}, - {"name": "network_id", "type": "string"}, - {"name": "chain_id", "type": "string"} - ]}}, - {"name": "feed_config", "type": {"type": "record", "name": "feed_config", "fields": [ - {"name": "feed_name", "type": "string"}, - {"name": "feed_path", "type": "string"}, - {"name": "symbol", "type": "string"}, - {"name": "heartbeat_sec", "type": "long"}, - {"name": "contract_type", "type": "string"}, - {"name": "contract_status", "type": "string"}, - {"name": "contract_address", "type": "bytes", "doc": "[32]byte"}, - {"name": "transmissions_account", "type": "bytes", "doc": "[32]byte"}, - {"name": "state_account", "type": "bytes", "doc": "[32]byte"} - ]}} - ] -} -` -*/ diff --git a/pkg/monitoring/benchmark_test.go b/pkg/monitoring/benchmark_test.go index 730874b00..799dcef6e 100644 --- a/pkg/monitoring/benchmark_test.go +++ b/pkg/monitoring/benchmark_test.go @@ -5,6 +5,7 @@ import ( "sync" "testing" + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" "github.com/smartcontractkit/chainlink/core/logger" ) @@ -17,38 +18,50 @@ import ( // 48993 35111 ns/op 44373 B/op 251 allocs/op // (13 Dec 2021) // 47331 34285 ns/op 41074 B/op 235 allocs/op +// (3 Jan 2022) +// 6985 162187 ns/op 114802 B/op 1506 allocs/op +// (4 Jan 2022) +// 9332 166275 ns/op 157078 B/op 1590 allocs/op -func BenchmarkMultichainMonitor(b *testing.B) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func BenchmarkMultichainMonitorStatePath(b *testing.B) { wg := &sync.WaitGroup{} defer wg.Wait() - feed := generateFeedConfig() - feed.PollInterval = 0 // poll as quickly as possible. - cfg := Config{} - cfg.Feeds = []FeedConfig{feed} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cfg := config.Config{} + cfg.Solana.PollInterval = 0 // poll as quickly as possible. + cfg.Feeds.Feeds = []config.Feed{generateFeedConfig()} transmissionSchema := fakeSchema{transmissionCodec} - stateSchema := fakeSchema{configSetCodec} + configSetSchema := fakeSchema{configSetCodec} configSetSimplifiedSchema := fakeSchema{configSetCodec} - producer := fakeProducer{make(chan producerMessage)} + producer := fakeProducer{make(chan producerMessage), ctx} transmissionReader := &fakeReader{make(chan interface{})} stateReader := &fakeReader{make(chan interface{})} monitor := NewMultiFeedMonitor( + cfg.Solana, + cfg.Feeds.Feeds, + logger.NewNullLogger(), - cfg, transmissionReader, stateReader, - transmissionSchema, stateSchema, configSetSimplifiedSchema, producer, &devnullMetrics{}, + + cfg.Kafka.ConfigSetTopic, + cfg.Kafka.ConfigSetSimplifiedTopic, + cfg.Kafka.TransmissionTopic, + + configSetSchema, + configSetSimplifiedSchema, + transmissionSchema, ) go monitor.Start(ctx, wg) - transmission := generateTransmissionEnvelope() state, err := generateStateEnvelope() if err != nil { b.Fatalf("failed to generate state: %v", err) @@ -58,15 +71,78 @@ func BenchmarkMultichainMonitor(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { select { - case transmissionReader.readCh <- transmission: case stateReader.readCh <- state: case <-ctx.Done(): - break + continue + } + select { + case <-producer.sendCh: + case <-ctx.Done(): + continue + } + } +} + +// Results: +// goos: darwin +// goarch: amd64 +// pkg: github.com/smartcontractkit/chainlink-solana/pkg/monitoring +// cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +// (4 Jan 2022) +// 61338 18841 ns/op 6606 B/op 137 allocs/op +func BenchmarkMultichainMonitorTransmissionPath(b *testing.B) { + wg := &sync.WaitGroup{} + defer wg.Wait() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cfg := config.Config{} + cfg.Solana.PollInterval = 0 // poll as quickly as possible. + cfg.Feeds.Feeds = []config.Feed{generateFeedConfig()} + + transmissionSchema := fakeSchema{transmissionCodec} + configSetSchema := fakeSchema{configSetCodec} + configSetSimplifiedSchema := fakeSchema{configSetCodec} + + producer := fakeProducer{make(chan producerMessage), ctx} + + transmissionReader := &fakeReader{make(chan interface{})} + stateReader := &fakeReader{make(chan interface{})} + + monitor := NewMultiFeedMonitor( + cfg.Solana, + cfg.Feeds.Feeds, + + logger.NewNullLogger(), + transmissionReader, stateReader, + producer, + &devnullMetrics{}, + + cfg.Kafka.ConfigSetTopic, + cfg.Kafka.ConfigSetSimplifiedTopic, + cfg.Kafka.TransmissionTopic, + + configSetSchema, + configSetSimplifiedSchema, + transmissionSchema, + ) + go monitor.Start(ctx, wg) + + transmission := generateTransmissionEnvelope() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + select { + case transmissionReader.readCh <- transmission: + case <-ctx.Done(): + continue } select { case <-producer.sendCh: case <-ctx.Done(): - break + continue } } } diff --git a/pkg/monitoring/config.go b/pkg/monitoring/config.go deleted file mode 100644 index eace3818c..000000000 --- a/pkg/monitoring/config.go +++ /dev/null @@ -1,270 +0,0 @@ -package monitoring - -import ( - "context" - "encoding/json" - "flag" - "fmt" - "net/http" - "os" - "time" - - "github.com/gagliardetto/solana-go" -) - -type Config struct { - Solana SolanaConfig `json:"solana,omitempty"` - Kafka KafkaConfig `json:"kafka,omitempty"` - SchemaRegistry SchemaRegistryConfig `json:"schema_registry,omitempty"` - Feeds []FeedConfig `json:"feeds,omitempty"` - Http HttpConfig `json:"http,omitempty"` - FeedsRddURL string `json:"feedsRddUrl,omitempty"` - FeedsFilePath string `json:"feedsFilePath,omitempty"` -} - -type SolanaConfig struct { - RPCEndpoint string `json:"rpc_endpoint,omitempty"` - NetworkName string `json:"network_name,omitempty"` - NetworkID string `json:"network_id,omitempty"` - ChainID string `json:"chain_id,omitempty"` -} - -type KafkaConfig struct { - Brokers string `json:"brokers,omitempty"` - ClientID string `json:"client_id,omitempty"` - SecurityProtocol string `json:"security_protocol,omitempty"` - SaslMechanism string `json:"sasl_mechanism,omitempty"` - SaslUsername string `json:"sasl_username,omitempty"` - SaslPassword string `json:"sasl_password,omitempty"` - TransmissionTopic string `json:"transmission_topic,omitempty"` - ConfigSetTopic string `json:"config_set_topic,omitempty"` - ConfigSetSimplifiedTopic string `json:"config_set_simplified_topic,omitempty"` -} - -type SchemaRegistryConfig struct { - URL string `json:"url,omitempty"` - Username string `json:"username,omitempty"` - Password string `json:"password,omitempty"` -} - -type FeedConfig struct { - // Data extracted from the RDD - FeedName string `json:"feed_name,omitempty"` - FeedPath string `json:"feed_path,omitempty"` - Symbol string `json:"symbol,omitempty"` - HeartbeatSec int64 `json:"heartbeat,omitempty"` - ContractType string `json:"contract_type,omitempty"` - ContractStatus string `json:"contract_status,omitempty"` - - // Equivalent to ProgramID in Solana - ContractAddress solana.PublicKey `json:"contract_address,omitempty"` - TransmissionsAccount solana.PublicKey `json:"transmissions_account,omitempty"` - StateAccount solana.PublicKey `json:"state_account,omitempty"` - - PollInterval time.Duration `json:"poll_interval,omitempty"` -} - -type HttpConfig struct { - Address string `json:"address,omitempty"` -} - -const ( - DefaultPollInterval = 5 * time.Second - - rddHttpCallTimeout = 5 * time.Second -) - -// ParseConfig populates a configuration object from various sources: -// - most params are passed as flags to the binary. -// - username and passwords can be overriden by environment variables. -// - feeds configuration can be passed by an RDD url or a local file (useful for testing). -// ParseConfig also validates and parses some of these inputs and returns an error for the first input that is found incorrect. -func ParseConfig(ctx context.Context) (Config, error) { - - cfg := Config{} - flag.StringVar(&cfg.Solana.RPCEndpoint, "solana.rpc_endpoint", "", "") - flag.StringVar(&cfg.Solana.NetworkName, "solana.network_name", "", "") - flag.StringVar(&cfg.Solana.NetworkID, "solana.network_id", "", "") - flag.StringVar(&cfg.Solana.ChainID, "solana.chain_id", "", "") - - flag.StringVar(&cfg.Kafka.ConfigSetTopic, "kafka.config_set_topic", "", "") - flag.StringVar(&cfg.Kafka.ConfigSetSimplifiedTopic, "kafka.config_set_simplified_topic", "", "") - flag.StringVar(&cfg.Kafka.TransmissionTopic, "kafka.transmission_topic", "", "") - flag.StringVar(&cfg.Kafka.Brokers, "kafka.brokers", "", "") - flag.StringVar(&cfg.Kafka.ClientID, "kafka.client_id", "", "") - flag.StringVar(&cfg.Kafka.SecurityProtocol, "kafka.security_protocol", "", "") - flag.StringVar(&cfg.Kafka.SaslMechanism, "kafka.sasl_mechanism", "", "") - flag.StringVar(&cfg.Kafka.SaslUsername, "kafka.sasl_username", "", "") - flag.StringVar(&cfg.Kafka.SaslPassword, "kafka.sasl_password", "", "") - - flag.StringVar(&cfg.SchemaRegistry.URL, "schema_registry.url", "", "") - flag.StringVar(&cfg.SchemaRegistry.Username, "schema_registry.username", "", "") - flag.StringVar(&cfg.SchemaRegistry.Password, "schema_registry.password", "", "") - - flag.StringVar(&cfg.FeedsFilePath, "feeds.file_path", "", "") - flag.StringVar(&cfg.FeedsRddURL, "feeds.rdd_url", "", "") - - flag.StringVar(&cfg.Http.Address, "http.address", "", "") - - flag.Parse() - - parseEnvVars(&cfg) - - for flagName, value := range map[string]string{ - "-solana.rpc_endpoint": cfg.Solana.RPCEndpoint, - - "-kafka.brokers": cfg.Kafka.Brokers, - "-kafka.client_id": cfg.Kafka.ClientID, - "-kafka.security_protocol": cfg.Kafka.SecurityProtocol, - - "-schema_registry.url": cfg.SchemaRegistry.URL, - - "-http.address": cfg.Http.Address, - } { - if value == "" { - return cfg, fmt.Errorf("flag '%s' is required", flagName) - } - } - - var feeds = []jsonFeedConfig{} - if cfg.FeedsFilePath == "" && cfg.FeedsRddURL == "" { - return cfg, fmt.Errorf("feeds configuration missing, either '-feeds.file_path' or '-feeds.rdd_url' must be set") - } else if cfg.FeedsRddURL != "" { - rddCtx, cancel := context.WithTimeout(ctx, rddHttpCallTimeout) - defer cancel() - readFeedsReq, err := http.NewRequestWithContext(rddCtx, http.MethodGet, cfg.FeedsRddURL, nil) - if err != nil { - return cfg, fmt.Errorf("unable to build a request to the RDD URL '%s': %w", cfg.FeedsRddURL, err) - } - httpClient := &http.Client{} - res, err := httpClient.Do(readFeedsReq) - if err != nil { - return cfg, fmt.Errorf("unable to fetch RDD data from URL '%s': %w", cfg.FeedsRddURL, err) - } - defer res.Body.Close() - decoder := json.NewDecoder(res.Body) - if err := decoder.Decode(&feeds); err != nil { - return cfg, fmt.Errorf("unable to unmarshal feeds config from RDD URL '%s': %w", cfg.FeedsRddURL, err) - } - } else if cfg.FeedsFilePath != "" { - contents, err := os.ReadFile(cfg.FeedsFilePath) - if err != nil { - return cfg, fmt.Errorf("unable to read feeds file '%s': %w", cfg.FeedsFilePath, err) - } - if err = json.Unmarshal(contents, &feeds); err != nil { - return cfg, fmt.Errorf("unable to unmarshal feeds config from file '%s': %w", cfg.FeedsFilePath, err) - } - } - - cfg.Feeds = make([]FeedConfig, len(feeds)) - for i, feed := range feeds { - contractAddress, err := solana.PublicKeyFromBase58(feed.ContractAddressBase58) - if err != nil { - return cfg, fmt.Errorf("failed to parse program id '%s' from JSON at index i=%d: %w", feed.ContractAddressBase58, i, err) - } - transmissionsAccount, err := solana.PublicKeyFromBase58(feed.TransmissionsAccountBase58) - if err != nil { - return cfg, fmt.Errorf("failed to parse transmission account '%s' from JSON at index i=%d: %w", feed.TransmissionsAccountBase58, i, err) - } - stateAccount, err := solana.PublicKeyFromBase58(feed.StateAccountBase58) - if err != nil { - return cfg, fmt.Errorf("failed to parse state account '%s' from JSON at index i=%d: %w", feed.StateAccountBase58, i, err) - } - pollInterval := DefaultPollInterval - if feed.PollIntervalMilliseconds != 0 { - pollInterval = time.Duration(feed.PollIntervalMilliseconds) * time.Millisecond - } - cfg.Feeds[i] = FeedConfig{ - feed.FeedName, - feed.FeedPath, - feed.Symbol, - feed.Heartbeat, - feed.ContractType, - feed.ContractStatus, - contractAddress, - transmissionsAccount, - stateAccount, - pollInterval, - } - } - - return cfg, nil -} - -func parseEnvVars(cfg *Config) { - if value, isPresent := os.LookupEnv("SOLANA_RPC_ENDPOINT"); isPresent { - cfg.Solana.RPCEndpoint = value - } - if value, isPresent := os.LookupEnv("SOLANA_NETWORK_NAME"); isPresent { - cfg.Solana.NetworkName = value - } - if value, isPresent := os.LookupEnv("SOLANA_NETWORK_ID"); isPresent { - cfg.Solana.NetworkID = value - } - if value, isPresent := os.LookupEnv("SOLANA_CHAIN_ID"); isPresent { - cfg.Solana.ChainID = value - } - if value, isPresent := os.LookupEnv("KAFKA_TRANSMISSION_TOPIC"); isPresent { - cfg.Kafka.TransmissionTopic = value - } - if value, isPresent := os.LookupEnv("KAFKA_CONFIG_SET_TOPIC"); isPresent { - cfg.Kafka.ConfigSetTopic = value - } - if value, isPresent := os.LookupEnv("KAFKA_CONFIG_SET_SIMPLIFIED_TOPIC"); isPresent { - cfg.Kafka.ConfigSetSimplifiedTopic = value - } - if value, isPresent := os.LookupEnv("KAFKA_BROKERS"); isPresent { - cfg.Kafka.Brokers = value - } - if value, isPresent := os.LookupEnv("KAFKA_CLIENT_ID"); isPresent { - cfg.Kafka.ClientID = value - } - if value, isPresent := os.LookupEnv("KAFKA_SECURITY_PROTOCOL"); isPresent { - cfg.Kafka.SecurityProtocol = value - } - if value, isPresent := os.LookupEnv("KAFKA_SASL_MECHANISM"); isPresent { - cfg.Kafka.SaslMechanism = value - } - if value, isPresent := os.LookupEnv("KAFKA_SASL_USERNAME"); isPresent { - cfg.Kafka.SaslUsername = value - } - if value, isPresent := os.LookupEnv("KAFKA_SASL_PASSWORD"); isPresent { - cfg.Kafka.SaslPassword = value - } - - if value, isPresent := os.LookupEnv("SCHEMA_REGISTRY_URL"); isPresent { - cfg.SchemaRegistry.URL = value - } - if value, isPresent := os.LookupEnv("SCHEMA_REGISTRY_USERNAME"); isPresent { - cfg.SchemaRegistry.Username = value - } - if value, isPresent := os.LookupEnv("SCHEMA_REGISTRY_PASSWORD"); isPresent { - cfg.SchemaRegistry.Password = value - } - - if value, isPresent := os.LookupEnv("HTTP_ADDRESS"); isPresent { - cfg.Http.Address = value - } - - if value, isPresent := os.LookupEnv("FEEDS_FILE_PATH"); isPresent { - cfg.FeedsFilePath = value - } - if value, isPresent := os.LookupEnv("FEEDS_URL"); isPresent { - cfg.FeedsRddURL = value - } -} - -type jsonFeedConfig struct { - FeedName string `json:"name,omitempty"` - FeedPath string `json:"path,omitempty"` - Symbol string `json:"symbol,omitempty"` - Heartbeat int64 `json:"heartbeat,omitempty"` - ContractType string `json:"contract_type,omitempty"` - ContractStatus string `json:"status,omitempty"` - - ContractAddressBase58 string `json:"contract_address_base58,omitempty"` - TransmissionsAccountBase58 string `json:"transmissions_account_base58,omitempty"` - StateAccountBase58 string `json:"state_account_base58,omitempty"` - - PollIntervalMilliseconds int64 `json:"poll_interval_milliseconds,omitempty"` -} diff --git a/pkg/monitoring/config/config.go b/pkg/monitoring/config/config.go new file mode 100644 index 000000000..a004b4e6c --- /dev/null +++ b/pkg/monitoring/config/config.go @@ -0,0 +1,178 @@ +package config + +import ( + "fmt" + "net/url" + "os" + "strconv" + "time" +) + +func Parse() (Config, error) { + cfg := Config{} + + if err := parseEnvVars(&cfg); err != nil { + return cfg, err + } + + applyDefaults(&cfg) + + if err := validateConfig(cfg); err != nil { + return cfg, err + } + + err := populateFeeds(&cfg) + return cfg, err +} + +func parseEnvVars(cfg *Config) error { + if value, isPresent := os.LookupEnv("SOLANA_RPC_ENDPOINT"); isPresent { + cfg.Solana.RPCEndpoint = value + } + if value, isPresent := os.LookupEnv("SOLANA_NETWORK_NAME"); isPresent { + cfg.Solana.NetworkName = value + } + if value, isPresent := os.LookupEnv("SOLANA_NETWORK_ID"); isPresent { + cfg.Solana.NetworkID = value + } + if value, isPresent := os.LookupEnv("SOLANA_CHAIN_ID"); isPresent { + cfg.Solana.ChainID = value + } + if value, isPresent := os.LookupEnv("SOLANA_READ_TIMEOUT"); isPresent { + readTimeout, err := time.ParseDuration(value) + if err != nil { + return fmt.Errorf("failed to parse env var SOLANA_READ_TIMEOUT, see https://pkg.go.dev/time#ParseDuration: %w", err) + } + cfg.Solana.ReadTimeout = readTimeout + } + if value, isPresent := os.LookupEnv("SOLANA_POLL_INTERVAL"); isPresent { + pollInterval, err := time.ParseDuration(value) + if err != nil { + return fmt.Errorf("failed to parse env var SOLANA_POLL_INTERVAL, see https://pkg.go.dev/time#ParseDuration: %w", err) + } + cfg.Solana.PollInterval = pollInterval + } + + if value, isPresent := os.LookupEnv("KAFKA_BROKERS"); isPresent { + cfg.Kafka.Brokers = value + } + if value, isPresent := os.LookupEnv("KAFKA_CLIENT_ID"); isPresent { + cfg.Kafka.ClientID = value + } + if value, isPresent := os.LookupEnv("KAFKA_SECURITY_PROTOCOL"); isPresent { + cfg.Kafka.SecurityProtocol = value + } + + if value, isPresent := os.LookupEnv("KAFKA_SASL_MECHANISM"); isPresent { + cfg.Kafka.SaslMechanism = value + } + if value, isPresent := os.LookupEnv("KAFKA_SASL_USERNAME"); isPresent { + cfg.Kafka.SaslUsername = value + } + if value, isPresent := os.LookupEnv("KAFKA_SASL_PASSWORD"); isPresent { + cfg.Kafka.SaslPassword = value + } + + if value, isPresent := os.LookupEnv("KAFKA_TRANSMISSION_TOPIC"); isPresent { + cfg.Kafka.TransmissionTopic = value + } + if value, isPresent := os.LookupEnv("KAFKA_CONFIG_SET_TOPIC"); isPresent { + cfg.Kafka.ConfigSetTopic = value + } + if value, isPresent := os.LookupEnv("KAFKA_CONFIG_SET_SIMPLIFIED_TOPIC"); isPresent { + cfg.Kafka.ConfigSetSimplifiedTopic = value + } + + if value, isPresent := os.LookupEnv("SCHEMA_REGISTRY_URL"); isPresent { + cfg.SchemaRegistry.URL = value + } + if value, isPresent := os.LookupEnv("SCHEMA_REGISTRY_USERNAME"); isPresent { + cfg.SchemaRegistry.Username = value + } + if value, isPresent := os.LookupEnv("SCHEMA_REGISTRY_PASSWORD"); isPresent { + cfg.SchemaRegistry.Password = value + } + + if value, isPresent := os.LookupEnv("FEEDS_URL"); isPresent { + cfg.Feeds.URL = value + } + if value, isPresent := os.LookupEnv("FEEDS_FILE_PATH"); isPresent { + cfg.Feeds.FilePath = value + } + if value, isPresent := os.LookupEnv("FEEDS_RDD_READ_TIMEOUT"); isPresent { + readTimeout, err := time.ParseDuration(value) + if err != nil { + return fmt.Errorf("failed to parse env var FEEDS_RDD_READ_TIMEOUT, see https://pkg.go.dev/time#ParseDuration: %w", err) + } + cfg.Feeds.RddReadTimeout = readTimeout + } + if value, isPresent := os.LookupEnv("FEEDS_RDD_POLL_INTERVAL"); isPresent { + pollInterval, err := time.ParseDuration(value) + if err != nil { + return fmt.Errorf("failed to parse env var FEEDS_RDD_POLL_INTERVAL, see https://pkg.go.dev/time#ParseDuration: %w", err) + } + cfg.Feeds.RddPollInterval = pollInterval + } + + if value, isPresent := os.LookupEnv("HTTP_ADDRESS"); isPresent { + cfg.Http.Address = value + } + + if value, isPresent := os.LookupEnv("FEATURE_TEST_MODE"); isPresent { + isTestMode, err := strconv.ParseBool(value) + if err != nil { + return fmt.Errorf("failed to parse boolean env var '%s'. See https://pkg.go.dev/strconv#ParseBool", "FEATURE_TEST_MODE") + } + cfg.Feature.TestMode = isTestMode + } + + return nil +} + +func validateConfig(cfg Config) error { + // Required config + for envVarName, currentValue := range map[string]string{ + "SOLANA_RPC_ENDPOINT": cfg.Solana.RPCEndpoint, + "SOLANA_NETWORK_NAME": cfg.Solana.NetworkName, + "SOLANA_NETWORK_ID": cfg.Solana.NetworkID, + "SOLANA_CHAIN_ID": cfg.Solana.ChainID, + + "KAFKA_BROKERS": cfg.Kafka.Brokers, + "KAFKA_CLIENT_ID": cfg.Kafka.ClientID, + "KAFKA_SECURITY_PROTOCOL": cfg.Kafka.SecurityProtocol, + "KAFKA_SASL_MECHANISM": cfg.Kafka.SaslMechanism, + + "KAFKA_CONFIG_SET_TOPIC": cfg.Kafka.ConfigSetTopic, + "KAFKA_TRANSMISSION_TOPIC": cfg.Kafka.TransmissionTopic, + "KAFKA_CONFIG_SET_SIMPLIFIED_TOPIC": cfg.Kafka.ConfigSetSimplifiedTopic, + + "SCHEMA_REGISTRY_URL": cfg.SchemaRegistry.URL, + + "HTTP_ADDRESS": cfg.Http.Address, + } { + if currentValue == "" { + return fmt.Errorf("'%s' env var is required", envVarName) + } + } + // Validate feeds. + if cfg.Feeds.URL == "" && cfg.Feeds.FilePath == "" { + return fmt.Errorf("must set one of 'FEEDS_URL' or 'FEEDS_FILE_PATH'") + } + if cfg.Feeds.URL != "" && cfg.Feeds.FilePath != "" { + return fmt.Errorf("can't set both 'FEEDS_URL' and 'FEEDS_FILE_PATH'. Only one allowed") + } + // Validate URLs. + for envVarName, currentValue := range map[string]string{ + "SOLANA_RPC_ENDPOINT": cfg.Solana.RPCEndpoint, + "SCHEMA_REGISTRY_URL": cfg.SchemaRegistry.URL, + "FEEDS_URL": cfg.Feeds.URL, + } { + if currentValue == "" { + continue + } + if _, err := url.ParseRequestURI(currentValue); err != nil { + return fmt.Errorf("%s='%s' is not a valid URL: %w", envVarName, currentValue, err) + } + } + return nil +} diff --git a/pkg/monitoring/config/defaults.go b/pkg/monitoring/config/defaults.go new file mode 100644 index 000000000..978af5e5e --- /dev/null +++ b/pkg/monitoring/config/defaults.go @@ -0,0 +1,18 @@ +package config + +import "time" + +func applyDefaults(cfg *Config) { + if cfg.Solana.ReadTimeout == 0 { + cfg.Solana.ReadTimeout = 2 * time.Second + } + if cfg.Solana.PollInterval == 0 { + cfg.Solana.PollInterval = 5 * time.Second + } + if cfg.Feeds.RddReadTimeout == 0 { + cfg.Feeds.RddReadTimeout = 1 * time.Second + } + if cfg.Feeds.RddPollInterval == 0 { + cfg.Feeds.RddPollInterval = 10 * time.Second + } +} diff --git a/pkg/monitoring/config/feeds.go b/pkg/monitoring/config/feeds.go new file mode 100644 index 000000000..024961d30 --- /dev/null +++ b/pkg/monitoring/config/feeds.go @@ -0,0 +1,82 @@ +package config + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + + "github.com/gagliardetto/solana-go" +) + +func populateFeeds(cfg *Config) error { + feeds := []jsonFeedConfig{} + if cfg.Feeds.URL != "" { + rddCtx, cancel := context.WithTimeout(context.Background(), cfg.Feeds.RddReadTimeout) + defer cancel() + readFeedsReq, err := http.NewRequestWithContext(rddCtx, http.MethodGet, cfg.Feeds.URL, nil) + if err != nil { + return fmt.Errorf("unable to build a request to the RDD URL '%s': %w", cfg.Feeds.URL, err) + } + httpClient := &http.Client{} + res, err := httpClient.Do(readFeedsReq) + if err != nil { + return fmt.Errorf("unable to fetch RDD data from URL '%s': %w", cfg.Feeds.URL, err) + } + defer res.Body.Close() + decoder := json.NewDecoder(res.Body) + if err := decoder.Decode(&feeds); err != nil { + return fmt.Errorf("unable to unmarshal feeds config from RDD URL '%s': %w", cfg.Feeds.URL, err) + } + } else if cfg.Feeds.FilePath != "" { + contents, err := os.ReadFile(cfg.Feeds.FilePath) + if err != nil { + return fmt.Errorf("unable to read feeds file '%s': %w", cfg.Feeds.FilePath, err) + } + if err = json.Unmarshal(contents, &feeds); err != nil { + return fmt.Errorf("unable to unmarshal feeds config from file '%s': %w", cfg.Feeds.FilePath, err) + } + } + + cfg.Feeds.Feeds = make([]Feed, len(feeds)) + for i, feed := range feeds { + contractAddress, err := solana.PublicKeyFromBase58(feed.ContractAddressBase58) + if err != nil { + return fmt.Errorf("failed to parse program id '%s' from JSON at index i=%d: %w", feed.ContractAddressBase58, i, err) + } + transmissionsAccount, err := solana.PublicKeyFromBase58(feed.TransmissionsAccountBase58) + if err != nil { + return fmt.Errorf("failed to parse transmission account '%s' from JSON at index i=%d: %w", feed.TransmissionsAccountBase58, i, err) + } + stateAccount, err := solana.PublicKeyFromBase58(feed.StateAccountBase58) + if err != nil { + return fmt.Errorf("failed to parse state account '%s' from JSON at index i=%d: %w", feed.StateAccountBase58, i, err) + } + cfg.Feeds.Feeds[i] = Feed{ + feed.FeedName, + feed.FeedPath, + feed.Symbol, + feed.Heartbeat, + feed.ContractType, + feed.ContractStatus, + contractAddress, + transmissionsAccount, + stateAccount, + } + } + return nil +} + +type jsonFeedConfig struct { + FeedName string `json:"name,omitempty"` + FeedPath string `json:"path,omitempty"` + Symbol string `json:"symbol,omitempty"` + Heartbeat int64 `json:"heartbeat,omitempty"` + ContractType string `json:"contract_type,omitempty"` + ContractStatus string `json:"status,omitempty"` + + ContractAddressBase58 string `json:"contract_address_base58,omitempty"` + TransmissionsAccountBase58 string `json:"transmissions_account_base58,omitempty"` + StateAccountBase58 string `json:"state_account_base58,omitempty"` +} diff --git a/pkg/monitoring/config/types.go b/pkg/monitoring/config/types.go new file mode 100644 index 000000000..5ed925166 --- /dev/null +++ b/pkg/monitoring/config/types.go @@ -0,0 +1,78 @@ +// package config parses flags, environment variables and json object to build +// a Config object that's used througout the monitor. +package config + +import ( + "time" + + "github.com/gagliardetto/solana-go" +) + +type Config struct { + Solana Solana + Kafka Kafka + SchemaRegistry SchemaRegistry + Feeds Feeds + Http Http + Feature Feature +} + +type Solana struct { + RPCEndpoint string + NetworkName string + NetworkID string + ChainID string + ReadTimeout time.Duration + PollInterval time.Duration +} + +type Kafka struct { + Brokers string + ClientID string + SecurityProtocol string + + SaslMechanism string + SaslUsername string + SaslPassword string + + TransmissionTopic string + ConfigSetTopic string + ConfigSetSimplifiedTopic string +} + +type SchemaRegistry struct { + URL string + Username string + Password string +} + +type Feeds struct { + URL string + FilePath string + Feeds []Feed + RddReadTimeout time.Duration + RddPollInterval time.Duration +} + +type Feed struct { + // Data extracted from the RDD + FeedName string + FeedPath string + Symbol string + HeartbeatSec int64 + ContractType string + ContractStatus string + + // Equivalent to ProgramID in Solana + ContractAddress solana.PublicKey + TransmissionsAccount solana.PublicKey + StateAccount solana.PublicKey +} + +type Http struct { + Address string +} + +type Feature struct { + TestMode bool +} diff --git a/pkg/monitoring/exporter.go b/pkg/monitoring/exporter.go new file mode 100644 index 000000000..c7e1bd3aa --- /dev/null +++ b/pkg/monitoring/exporter.go @@ -0,0 +1,9 @@ +package monitoring + +import ( + "context" +) + +type Exporter interface { + Export(ctx context.Context, data interface{}) +} diff --git a/pkg/monitoring/exporter_kafka.go b/pkg/monitoring/exporter_kafka.go new file mode 100644 index 000000000..67356d55e --- /dev/null +++ b/pkg/monitoring/exporter_kafka.go @@ -0,0 +1,113 @@ +package monitoring + +import ( + "context" + + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" + "github.com/smartcontractkit/chainlink/core/logger" +) + +func NewKafkaExporter( + solanaConfig config.Solana, + feedConfig config.Feed, + + log logger.Logger, + producer Producer, + + configSetSchema Schema, + configSetSimplifiedSchema Schema, + transmissionSchema Schema, + + configSetTopic string, + configSetSimplifiedTopic string, + transmissionTopic string, +) Exporter { + return &kafkaExporter{ + solanaConfig, + feedConfig, + + log, + producer, + + configSetSchema, + configSetSimplifiedSchema, + transmissionSchema, + + configSetTopic, + configSetSimplifiedTopic, + transmissionTopic, + } +} + +type kafkaExporter struct { + solanaConfig config.Solana + feedConfig config.Feed + + log logger.Logger + producer Producer + + configSetSchema Schema + configSetSimplifiedSchema Schema + transmissionSchema Schema + + configSetTopic string + configSetSimplifiedTopic string + transmissionTopic string +} + +func (k *kafkaExporter) Export(ctx context.Context, data interface{}) { + key := k.feedConfig.StateAccount.Bytes() + switch typed := data.(type) { + case StateEnvelope: + func() { + configSetMapping, err := MakeConfigSetMapping(typed, k.solanaConfig, k.feedConfig) + if err != nil { + k.log.Errorw("failed to map config_set", "error", err) + return + } + configSetEncoded, err := k.configSetSchema.Encode(configSetMapping) + if err != nil { + k.log.Errorw("failed to encode config_set to Avro", "payload", configSetMapping, "error", err) + return + } + if err := k.producer.Produce(key, configSetEncoded, k.configSetTopic); err != nil { + k.log.Errorw("failed to publish config_set", "payload", configSetMapping, "error", err) + return + } + }() + + func() { + configSetSimplifiedMapping, err := MakeConfigSetSimplifiedMapping(typed, k.feedConfig) + if err != nil { + k.log.Errorw("failed to map config_set_simplified", "error", err) + return + } + configSetSimplifiedEncoded, err := k.configSetSimplifiedSchema.Encode(configSetSimplifiedMapping) + if err != nil { + k.log.Errorw("failed to encode config_set_simplified to Avro", "payload", configSetSimplifiedMapping, "error", err) + return + } + if err := k.producer.Produce(key, configSetSimplifiedEncoded, k.configSetSimplifiedTopic); err != nil { + k.log.Errorw("failed to publish config_set_simplified", "payload", configSetSimplifiedMapping, "error", err) + return + } + }() + case TransmissionEnvelope: + transmissionMapping, err := MakeTransmissionMapping(typed, k.solanaConfig, k.feedConfig) + if err != nil { + k.log.Errorw("failed to map transmission", "error", err) + return + } + transmissionEncoded, err := k.transmissionSchema.Encode(transmissionMapping) + if err != nil { + k.log.Errorw("failed to encode transmission to Avro", "payload", transmissionMapping, "error", err) + return + } + if err := k.producer.Produce(key, transmissionEncoded, k.transmissionTopic); err != nil { + k.log.Errorw("failed to publish transmission", "payload", transmissionMapping, "error", err) + return + } + default: + k.log.Errorf("unknown type %T to export", data) + } +} diff --git a/pkg/monitoring/exporter_prometheus.go b/pkg/monitoring/exporter_prometheus.go new file mode 100644 index 000000000..b0734b09c --- /dev/null +++ b/pkg/monitoring/exporter_prometheus.go @@ -0,0 +1,136 @@ +package monitoring + +import ( + "context" + "sync" + "time" + + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" + "github.com/smartcontractkit/chainlink/core/logger" +) + +func NewPrometheusExporter( + solanaConfig config.Solana, + feedConfig config.Feed, + log logger.Logger, + metrics Metrics, +) Exporter { + metrics.SetFeedContractMetadata( + solanaConfig.ChainID, + feedConfig.ContractAddress.String(), + feedConfig.StateAccount.String(), + feedConfig.ContractStatus, + feedConfig.ContractType, + feedConfig.FeedName, + feedConfig.FeedPath, + solanaConfig.NetworkID, + solanaConfig.NetworkName, + feedConfig.Symbol, + ) + + return &prometheusExporter{ + solanaConfig, + feedConfig, + log, + metrics, + "n/a", + sync.Mutex{}, + } +} + +type prometheusExporter struct { + solanaConfig config.Solana + feedConfig config.Feed + + log logger.Logger + metrics Metrics + + // The transmissions account does not record the latest transmitter node. + // Instead, the state account stores the latest transmitter node's public key. + // We store the latest transmitter in memory to be used to associate the latest update. + latestTransmitter string + latestTransmitterMu sync.Mutex +} + +func (p *prometheusExporter) Export(ctx context.Context, data interface{}) { + switch typed := data.(type) { + case StateEnvelope: + p.metrics.SetNodeMetadata( + p.solanaConfig.ChainID, + p.solanaConfig.NetworkID, + p.solanaConfig.NetworkName, + "n/a", + typed.State.Config.LatestTransmitter.String(), + ) + + func() { + p.latestTransmitterMu.Lock() + defer p.latestTransmitterMu.Unlock() + p.latestTransmitter = typed.State.Config.LatestTransmitter.String() + }() + case TransmissionEnvelope: + p.metrics.SetHeadTrackerCurrentHead( + typed.BlockNumber, + p.solanaConfig.NetworkName, + p.solanaConfig.ChainID, + p.solanaConfig.NetworkID, + ) + p.metrics.SetOffchainAggregatorAnswers( + typed.Answer.Data, + p.feedConfig.ContractAddress.String(), + p.feedConfig.StateAccount.String(), + p.solanaConfig.ChainID, + p.feedConfig.ContractStatus, + p.feedConfig.ContractType, + p.feedConfig.FeedName, + p.feedConfig.FeedPath, + p.solanaConfig.NetworkID, + p.solanaConfig.NetworkName, + ) + p.metrics.IncOffchainAggregatorAnswersTotal( + p.feedConfig.ContractAddress.String(), + p.feedConfig.StateAccount.String(), + p.solanaConfig.ChainID, + p.feedConfig.ContractStatus, + p.feedConfig.ContractType, + p.feedConfig.FeedName, + p.feedConfig.FeedPath, + p.solanaConfig.NetworkID, + p.solanaConfig.NetworkName, + ) + + isLateAnswer := time.Since(time.Unix(int64(typed.Answer.Timestamp), 0)).Seconds() > float64(p.feedConfig.HeartbeatSec) + p.metrics.SetOffchainAggregatorAnswerStalled( + isLateAnswer, + p.feedConfig.ContractAddress.String(), + p.feedConfig.StateAccount.String(), + p.solanaConfig.ChainID, + p.feedConfig.ContractStatus, + p.feedConfig.ContractType, + p.feedConfig.FeedName, + p.feedConfig.FeedPath, + p.solanaConfig.NetworkID, + p.solanaConfig.NetworkName, + ) + + func() { + p.latestTransmitterMu.Lock() + defer p.latestTransmitterMu.Unlock() + p.metrics.SetOffchainAggregatorSubmissionReceivedValues( + typed.Answer.Data, + p.feedConfig.ContractAddress.String(), + p.feedConfig.StateAccount.String(), + p.latestTransmitter, + p.solanaConfig.ChainID, + p.feedConfig.ContractStatus, + p.feedConfig.ContractType, + p.feedConfig.FeedName, + p.feedConfig.FeedPath, + p.solanaConfig.NetworkID, + p.solanaConfig.NetworkName, + ) + }() + default: + p.log.Errorf("unexpected type %T for export", data) + } +} diff --git a/pkg/monitoring/exporter_prometheus_test.go b/pkg/monitoring/exporter_prometheus_test.go new file mode 100644 index 000000000..0534d6a4d --- /dev/null +++ b/pkg/monitoring/exporter_prometheus_test.go @@ -0,0 +1,55 @@ +package monitoring + +import ( + "context" + "testing" + + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" + "github.com/smartcontractkit/chainlink/core/logger" + "github.com/stretchr/testify/require" +) + +func TestPrometheusExporter(t *testing.T) { + transmissionAccount := generatePublicKey() + stateAccount := generatePublicKey() + + cfg := config.Config{} + cfg.Feeds.Feeds = []config.Feed{ + { + TransmissionsAccount: transmissionAccount, + StateAccount: stateAccount, + }, + } + + ctx := context.Background() + + t.Run("should still publish new transmissions even if a transmitter is not set", func(t *testing.T) { + metrics := &keepLatestMetrics{} + exporter := NewPrometheusExporter( + cfg.Solana, cfg.Feeds.Feeds[0], + logger.NewNullLogger(), + metrics, + ) + + envelope := generateTransmissionEnvelope() + exporter.Export(ctx, envelope) + require.Equal(t, metrics.latestTransmitter, "n/a") + }) + t.Run("should publish a new transmission with latest transmitter", func(t *testing.T) { + metrics := &keepLatestMetrics{} + exporter := NewPrometheusExporter( + cfg.Solana, cfg.Feeds.Feeds[0], + logger.NewNullLogger(), + metrics, + ) + + envelope1, err := generateStateEnvelope() + require.NoError(t, err) + exporter.Export(ctx, envelope1) + + envelope2 := generateTransmissionEnvelope() + exporter.Export(ctx, envelope2) + + require.Equal(t, metrics.latestTransmitter, envelope1.State.Config.LatestTransmitter.String()) + }) +} diff --git a/pkg/monitoring/feed_monitor.go b/pkg/monitoring/feed_monitor.go index 6be176edc..093f35a15 100644 --- a/pkg/monitoring/feed_monitor.go +++ b/pkg/monitoring/feed_monitor.go @@ -2,61 +2,37 @@ package monitoring import ( "context" - "fmt" - "log" - "math/big" - "time" + "sync" "github.com/smartcontractkit/chainlink/core/logger" ) type FeedMonitor interface { - Start(ctx context.Context) + Start(ctx context.Context, wg *sync.WaitGroup) } func NewFeedMonitor( log logger.Logger, - config Config, - feedConfig FeedConfig, transmissionPoller, statePoller Poller, - transmissionSchema, stateSchema, configSetSimplified Schema, - producer Producer, - metrics Metrics, + exporters []Exporter, ) FeedMonitor { return &feedMonitor{ log, - config, - feedConfig, transmissionPoller, statePoller, - transmissionSchema, stateSchema, configSetSimplified, - producer, - metrics, + exporters, } } type feedMonitor struct { - log logger.Logger - config Config - feedConfig FeedConfig - transmissionPoller Poller - statePoller Poller - transmissionSchema Schema - stateSchema Schema - configSetSimplifiedSchema Schema - producer Producer - metrics Metrics + log logger.Logger + transmissionPoller Poller + statePoller Poller + exporters []Exporter } // Start should be executed as a goroutine -func (f *feedMonitor) Start(ctx context.Context) { +func (f *feedMonitor) Start(ctx context.Context, wg *sync.WaitGroup) { f.log.Info("starting feed monitor") - f.metrics.SetFeedContractMetadata(f.config.Solana.ChainID, f.feedConfig.ContractAddress.String(), - f.feedConfig.ContractStatus, f.feedConfig.ContractType, f.feedConfig.FeedName, - f.feedConfig.FeedPath, f.config.Solana.NetworkID, f.config.Solana.NetworkName, - f.feedConfig.Symbol) - - latestTransmitter := "" - var latestAnswer *big.Int = nil for { // Wait for an update. var update interface{} @@ -68,102 +44,12 @@ func (f *feedMonitor) Start(ctx context.Context) { case <-ctx.Done(): return } - var err error - switch typed := update.(type) { - case StateEnvelope: - err = f.processConfigSetSimplified(typed) - if err != nil { - break - } - - err = f.processState(typed) - if err != nil { - break - } - latestTransmitter = typed.State.Config.LatestTransmitter.String() - f.metrics.SetNodeMetadata(f.config.Solana.ChainID, f.config.Solana.NetworkID, - f.config.Solana.NetworkName, "n/a", latestTransmitter) - if latestAnswer != nil { - f.metrics.SetOffchainAggregatorSubmissionReceivedValues(latestAnswer, - f.feedConfig.ContractAddress.String(), latestTransmitter, f.config.Solana.ChainID, - f.feedConfig.ContractStatus, f.feedConfig.ContractType, f.feedConfig.FeedName, - f.feedConfig.FeedPath, f.config.Solana.NetworkID, f.config.Solana.NetworkName) - } - case TransmissionEnvelope: - err = f.processTransmission(typed) - latestAnswer = typed.Answer.Data - f.metrics.SetHeadTrackerCurrentHead(typed.BlockNumber, f.config.Solana.NetworkName, - f.config.Solana.ChainID, f.config.Solana.NetworkID) - f.metrics.SetOffchainAggregatorAnswers(latestAnswer, f.feedConfig.ContractAddress.String(), - f.config.Solana.ChainID, f.feedConfig.ContractStatus, f.feedConfig.ContractType, - f.feedConfig.FeedName, f.feedConfig.FeedPath, f.config.Solana.NetworkID, - f.config.Solana.NetworkName) - f.metrics.IncOffchainAggregatorAnswersTotal(f.feedConfig.ContractAddress.String(), - f.config.Solana.ChainID, f.feedConfig.ContractStatus, f.feedConfig.ContractType, - f.feedConfig.FeedName, f.feedConfig.FeedPath, f.config.Solana.NetworkID, - f.config.Solana.NetworkName) - isLateAnswer := time.Since(time.Unix(int64(typed.Answer.Timestamp), 0)).Seconds() > float64(f.feedConfig.HeartbeatSec) - f.metrics.SetOffchainAggregatorAnswerStalled(isLateAnswer, f.feedConfig.ContractAddress.String(), - f.config.Solana.ChainID, f.feedConfig.ContractStatus, f.feedConfig.ContractType, - f.feedConfig.FeedName, f.feedConfig.FeedPath, f.config.Solana.NetworkID, - f.config.Solana.NetworkName) - default: - err = fmt.Errorf("unknown update type %T", update) - } - if err != nil { - log.Printf("failed to send message %T: %v", update, err) - continue + for _, exp := range f.exporters { + wg.Add(1) + go func(exp Exporter) { + defer wg.Done() + exp.Export(ctx, update) + }(exp) } } } - -func (f *feedMonitor) processState(envelope StateEnvelope) error { - var mapping map[string]interface{} - mapping, err := MakeConfigSetMapping(envelope, f.config.Solana, f.feedConfig) - if err != nil { - return fmt.Errorf("failed to map message %v: %w", envelope, err) - } - value, err := f.stateSchema.Encode(mapping) - if err != nil { - return fmt.Errorf("failed to enconde message %v: %w", envelope, err) - } - var key = f.feedConfig.StateAccount.Bytes() - if err = f.producer.Produce(key, value, f.config.Kafka.ConfigSetTopic); err != nil { - return fmt.Errorf("failed to publish message %v: %w", envelope, err) - } - return nil -} - -func (f *feedMonitor) processTransmission(envelope TransmissionEnvelope) error { - var mapping map[string]interface{} - mapping, err := MakeTransmissionMapping(envelope, f.config.Solana, f.feedConfig) - if err != nil { - return fmt.Errorf("failed to map message %v: %w", envelope, err) - } - value, err := f.transmissionSchema.Encode(mapping) - if err != nil { - return fmt.Errorf("failed to enconde message %v: %w", envelope, err) - } - var key = f.feedConfig.StateAccount.Bytes() - if err = f.producer.Produce(key, value, f.config.Kafka.TransmissionTopic); err != nil { - return fmt.Errorf("failed to publish message %v: %w", envelope, err) - } - return nil -} - -func (f *feedMonitor) processConfigSetSimplified(envelope StateEnvelope) error { - var mapping map[string]interface{} - mapping, err := MakeSimplifiedConfigSetMapping(envelope, f.feedConfig) - if err != nil { - return fmt.Errorf("failed to map message %v: %w", envelope, err) - } - value, err := f.configSetSimplifiedSchema.Encode(mapping) - if err != nil { - return fmt.Errorf("failed to enconde message %v: %w", envelope, err) - } - var key = f.feedConfig.StateAccount.Bytes() - if err = f.producer.Produce(key, value, f.config.Kafka.ConfigSetSimplifiedTopic); err != nil { - return fmt.Errorf("failed to publish message %v: %w", envelope, err) - } - return nil -} diff --git a/pkg/monitoring/feed_monitor_test.go b/pkg/monitoring/feed_monitor_test.go index 7186b61ed..bdcc6161d 100644 --- a/pkg/monitoring/feed_monitor_test.go +++ b/pkg/monitoring/feed_monitor_test.go @@ -2,9 +2,11 @@ package monitoring import ( "context" + "sync" "testing" "time" + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" "github.com/smartcontractkit/chainlink/core/logger" "github.com/stretchr/testify/require" ) @@ -19,35 +21,66 @@ func TestFeedMonitor(t *testing.T) { transmissionAccount := generatePublicKey() stateAccount := generatePublicKey() - fetchInterval := time.Second + pollInterval := 1 * time.Second + readTimeout := 1 * time.Second var bufferCapacity uint32 = 0 // no buffering - transmissionPoller := NewPoller(logger.NewNullLogger(), transmissionAccount, transmissionReader, fetchInterval, bufferCapacity) - statePoller := NewPoller(logger.NewNullLogger(), stateAccount, stateReader, fetchInterval, bufferCapacity) + transmissionPoller := NewPoller( + logger.NewNullLogger(), + transmissionAccount, transmissionReader, + pollInterval, readTimeout, + bufferCapacity, + ) + statePoller := NewPoller( + logger.NewNullLogger(), + stateAccount, stateReader, + pollInterval, readTimeout, + bufferCapacity, + ) - producer := fakeProducer{make(chan producerMessage)} + producer := fakeProducer{make(chan producerMessage), ctx} - transmissionSchema := fakeSchema{transmissionCodec} - stateSchema := fakeSchema{configSetCodec} + configSetSchema := fakeSchema{configSetCodec} configSetSimplifiedSchema := fakeSchema{configSetSimplifiedCodec} + transmissionSchema := fakeSchema{transmissionCodec} + + cfg := config.Config{} + cfg.Feeds.Feeds = []config.Feed{ + { + TransmissionsAccount: transmissionAccount, + StateAccount: stateAccount, + }, + } + + exporters := []Exporter{ + NewPrometheusExporter( + cfg.Solana, + cfg.Feeds.Feeds[0], + logger.NewNullLogger(), + &devnullMetrics{}, + ), + NewKafkaExporter( + cfg.Solana, + cfg.Feeds.Feeds[0], + logger.NewNullLogger(), + producer, - cfg := Config{} + configSetSchema, + configSetSimplifiedSchema, + transmissionSchema, - feedConfig := FeedConfig{ - TransmissionsAccount: transmissionAccount, - StateAccount: stateAccount, + cfg.Kafka.ConfigSetTopic, + cfg.Kafka.ConfigSetSimplifiedTopic, + cfg.Kafka.TransmissionTopic, + ), } monitor := NewFeedMonitor( logger.NewNullLogger(), - cfg, - feedConfig, transmissionPoller, statePoller, - transmissionSchema, stateSchema, configSetSimplifiedSchema, - producer, - &devnullMetrics{}, + exporters, ) - go monitor.Start(ctx) + go monitor.Start(ctx, &sync.WaitGroup{}) trCount, stCount := 0, 0 var messages []producerMessage diff --git a/pkg/monitoring/mapping.go b/pkg/monitoring/mapping.go index b10350abd..6b02193ce 100644 --- a/pkg/monitoring/mapping.go +++ b/pkg/monitoring/mapping.go @@ -5,6 +5,9 @@ import ( "encoding/binary" "encoding/json" "fmt" + + "github.com/mr-tron/base58" + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/pb" pkgSolana "github.com/smartcontractkit/chainlink-solana/pkg/solana" "google.golang.org/protobuf/proto" @@ -12,11 +15,10 @@ import ( func MakeConfigSetMapping( envelope StateEnvelope, - solanaConfig SolanaConfig, - feedConfig FeedConfig, + solanaConfig config.Solana, + feedConfig config.Feed, ) (map[string]interface{}, error) { - state := envelope.State - offchainConfig, err := parseOffchainConfig(state.Config.OffchainConfig.Raw[:state.Config.OffchainConfig.Len]) + offchainConfig, err := parseOffchainConfig(envelope.State.Config.OffchainConfig.Raw[:envelope.State.Config.OffchainConfig.Len]) if err != nil { return nil, fmt.Errorf("failed to parse OffchainConfig blob from the program state: %w", err) } @@ -24,21 +26,33 @@ func MakeConfigSetMapping( if err != nil { return nil, fmt.Errorf("failed to parse ReportingPluginConfig from OffchainConfig: %w", err) } + sharedSecredEncryptions := map[string]interface{}{ + "diffie_hellman_point": []byte{}, + "shared_secret_hash": []byte{}, + "encryptions": []byte{}, + } + if offchainConfig.SharedSecretEncryptions != nil { + sharedSecredEncryptions = map[string]interface{}{ + "diffie_hellman_point": offchainConfig.SharedSecretEncryptions.DiffieHellmanPoint, + "shared_secret_hash": offchainConfig.SharedSecretEncryptions.SharedSecretHash, + "encryptions": offchainConfig.SharedSecretEncryptions.Encryptions, + } + } out := map[string]interface{}{ "block_number": uint64ToBeBytes(envelope.BlockNumber), "contract_config": map[string]interface{}{ - "config_digest": state.Config.LatestConfigDigest[:], - "config_count": int64(state.Config.ConfigCount), - "signers": extractSigners(state.Oracles), - "transmitters": extractTransmitters(state.Oracles), - "f": int32(state.Config.F), + "config_digest": envelope.State.Config.LatestConfigDigest[:], + "config_count": int64(envelope.State.Config.ConfigCount), + "signers": extractSigners(envelope.State.Oracles), + "transmitters": extractTransmitters(envelope.State.Oracles), + "f": int32(envelope.State.Config.F), "onchain_config": map[string]interface{}{ "link.chain.ocr2.ocr2_numerical_median_onchain_config": map[string]interface{}{ - "min": state.Config.MinAnswer.BigInt().Bytes(), - "max": state.Config.MaxAnswer.BigInt().Bytes(), + "min": envelope.State.Config.MinAnswer.BigInt().Bytes(), + "max": envelope.State.Config.MaxAnswer.BigInt().Bytes(), }, }, - "offchain_config_version": uint64ToBeBytes(state.Config.OffchainConfig.Version), + "offchain_config_version": uint64ToBeBytes(envelope.State.Config.OffchainConfig.Version), "offchain_config": map[string]interface{}{ "link.chain.ocr2.ocr2_offchain_config": map[string]interface{}{ "delta_progress_nanoseconds": uint64ToBeBytes(offchainConfig.DeltaProgressNanoseconds), @@ -64,75 +78,43 @@ func MakeConfigSetMapping( "max_duration_report_nanoseconds": uint64ToBeBytes(offchainConfig.MaxDurationReportNanoseconds), "max_duration_should_accept_finalized_report_nanoseconds": uint64ToBeBytes(offchainConfig.MaxDurationShouldAcceptFinalizedReportNanoseconds), "max_duration_should_transmit_accepted_report_nanoseconds": uint64ToBeBytes(offchainConfig.MaxDurationShouldTransmitAcceptedReportNanoseconds), - "shared_secret_encryptions": map[string]interface{}{ - "diffie_hellman_point": offchainConfig.SharedSecretEncryptions.DiffieHellmanPoint, - "shared_secret_hash": offchainConfig.SharedSecretEncryptions.SharedSecretHash, - "encryptions": offchainConfig.SharedSecretEncryptions.Encryptions, - }, + "shared_secret_encryptions": sharedSecredEncryptions, }, }, }, "solana_program_state": map[string]interface{}{ - "account_discriminator": state.AccountDiscriminator[:8], - "version": int32(state.Version), - "nonce": int32(state.Nonce), + "account_discriminator": envelope.State.AccountDiscriminator[:8], + "version": int32(envelope.State.Version), + "nonce": int32(envelope.State.Nonce), "config": map[string]interface{}{ - "owner": state.Config.Owner[:], - "token_mint": state.Config.TokenMint[:], - "token_vault": state.Config.TokenVault[:], - "requester_access_controller": state.Config.RequesterAccessController[:], - "billing_access_controller": state.Config.BillingAccessController[:], - "min_answer": state.Config.MinAnswer.BigInt().Bytes(), - "max_answer": state.Config.MaxAnswer.BigInt().Bytes(), - "description": state.Config.Description[:], - "decimals": int32(state.Config.Decimals), - "f": int32(state.Config.F), - "round": int32(state.Config.Round), - "epoch": int64(state.Config.Epoch), - "latest_aggregator_round_id": int64(state.Config.LatestAggregatorRoundID), - "latest_transmitter": state.Config.LatestTransmitter[:], - "config_count": int64(state.Config.ConfigCount), - "latest_config_digest": state.Config.LatestConfigDigest[:], - "latest_config_block_number": uint64ToBeBytes(state.Config.LatestConfigBlockNumber), + "owner": envelope.State.Config.Owner[:], + "token_mint": envelope.State.Config.TokenMint[:], + "token_vault": envelope.State.Config.TokenVault[:], + "requester_access_controller": envelope.State.Config.RequesterAccessController[:], + "billing_access_controller": envelope.State.Config.BillingAccessController[:], + "min_answer": envelope.State.Config.MinAnswer.BigInt().Bytes(), + "max_answer": envelope.State.Config.MaxAnswer.BigInt().Bytes(), + "description": envelope.State.Config.Description[:], + "decimals": int32(envelope.State.Config.Decimals), + "f": int32(envelope.State.Config.F), + "round": int32(envelope.State.Config.Round), + "epoch": int64(envelope.State.Config.Epoch), + "latest_aggregator_round_id": int64(envelope.State.Config.LatestAggregatorRoundID), + "latest_transmitter": envelope.State.Config.LatestTransmitter[:], + "config_count": int64(envelope.State.Config.ConfigCount), + "latest_config_digest": envelope.State.Config.LatestConfigDigest[:], + "latest_config_block_number": uint64ToBeBytes(envelope.State.Config.LatestConfigBlockNumber), "billing": map[string]interface{}{ - "observation_payment": int64(state.Config.Billing.ObservationPayment), + "observation_payment": int64(envelope.State.Config.Billing.ObservationPayment), }, + // These two fields (validator, flagging_threshold) have been removed from the program's + // state but they have been kept here to preserve backwards compatibility. + "validator": []byte{}, + "flagging_threshold": 0, }, - "oracles": formatOracles(state.Oracles), - "leftover_payment": formatLeftovers(state.LeftoverPayments), - "transmissions": state.Transmissions[:], - }, - "solana_chain_config": map[string]interface{}{ - "network_name": solanaConfig.NetworkName, - "network_id": solanaConfig.NetworkID, - "chain_id": solanaConfig.ChainID, - }, - "feed_config": map[string]interface{}{ - "feed_name": feedConfig.FeedName, - "feed_path": feedConfig.FeedPath, - "symbol": feedConfig.Symbol, - "heartbeat_sec": int64(feedConfig.HeartbeatSec), - "contract_type": feedConfig.ContractType, - "contract_status": feedConfig.ContractStatus, - "contract_address": feedConfig.ContractAddress.Bytes(), - "transmissions_account": feedConfig.TransmissionsAccount.Bytes(), - "state_account": feedConfig.StateAccount.Bytes(), - }, - } - return out, nil -} - -func MakeTransmissionMapping( - envelope TransmissionEnvelope, - solanaConfig SolanaConfig, - feedConfig FeedConfig, -) (map[string]interface{}, error) { - answer := envelope.Answer - out := map[string]interface{}{ - "block_number": uint64ToBeBytes(envelope.BlockNumber), - "answer": map[string]interface{}{ - "data": answer.Data.Bytes(), - "timestamp": int64(answer.Timestamp), + "oracles": formatOracles(envelope.State.Oracles), + "leftover_payment": formatLeftovers(envelope.State.LeftoverPayments), + "transmissions": envelope.State.Transmissions[:], }, "solana_chain_config": map[string]interface{}{ "network_name": solanaConfig.NetworkName, @@ -154,39 +136,36 @@ func MakeTransmissionMapping( return out, nil } -func MakeSimplifiedConfigSetMapping( +func MakeConfigSetSimplifiedMapping( envelope StateEnvelope, - feedConfig FeedConfig, + feedConfig config.Feed, ) (map[string]interface{}, error) { - state := envelope.State - offchainConfig, err := parseOffchainConfig(state.Config.OffchainConfig.Raw[:state.Config.OffchainConfig.Len]) + offchainConfig, err := parseOffchainConfig(envelope.State.Config.OffchainConfig.Raw[:envelope.State.Config.OffchainConfig.Len]) if err != nil { return nil, fmt.Errorf("failed to parse OffchainConfig blob from the program state: %w", err) } - signers, err := json.Marshal(extractSigners(state.Oracles)) + signers, err := json.Marshal(extractSigners(envelope.State.Oracles)) if err != nil { - return nil, fmt.Errorf("failed to parse signers: %w", err) + return nil, fmt.Errorf("failed to marshal signers: %w", err) } - transmitters, err := json.Marshal(extractTransmitters(state.Oracles)) + transmitters, err := json.Marshal(extractTransmitters(envelope.State.Oracles)) if err != nil { - return nil, fmt.Errorf("failed to parse transmitters: %w", err) + return nil, fmt.Errorf("failed to marshal transmitters: %w", err) } s, err := json.Marshal(int32ArrToInt64Arr(offchainConfig.S)) if err != nil { - return nil, fmt.Errorf("failed to parse s: %w", err) + return nil, fmt.Errorf("failed to marshal schedule: %w", err) } - - oracles, err := createConfigSetSimplifiedOracles(offchainConfig.OffchainPublicKeys, offchainConfig.PeerIds, state.Oracles) + oracles, err := createConfigSetSimplifiedOracles(offchainConfig.OffchainPublicKeys, offchainConfig.PeerIds, envelope.State.Oracles) if err != nil { - return nil, fmt.Errorf("failed to oracles s: %w", err) + return nil, fmt.Errorf("failed to encode oracle set: %w", err) } - out := map[string]interface{}{ - "config_digest": base64.StdEncoding.EncodeToString(state.Config.LatestConfigDigest[:]), + "config_digest": base64.StdEncoding.EncodeToString(envelope.State.Config.LatestConfigDigest[:]), "block_number": uint64ToBeBytes(envelope.BlockNumber), "signers": string(signers), "transmitters": string(transmitters), - "f": int32(state.Config.F), + "f": int32(envelope.State.Config.F), "delta_progress": uint64ToBeBytes(offchainConfig.DeltaProgressNanoseconds), "delta_resend": uint64ToBeBytes(offchainConfig.DeltaResendNanoseconds), "delta_round": uint64ToBeBytes(offchainConfig.DeltaRoundNanoseconds), @@ -195,10 +174,44 @@ func MakeSimplifiedConfigSetMapping( "r_max": int64(offchainConfig.RMax), "s": string(s), "oracles": string(oracles), - "feed_state_account": base64.StdEncoding.EncodeToString(feedConfig.StateAccount[:]), + "feed_state_account": base58.Encode(feedConfig.StateAccount[:]), } return out, nil +} +func MakeTransmissionMapping( + envelope TransmissionEnvelope, + solanaConfig config.Solana, + feedConfig config.Feed, +) (map[string]interface{}, error) { + data := []byte{} + if envelope.Answer.Data != nil { + data = envelope.Answer.Data.Bytes() + } + out := map[string]interface{}{ + "block_number": uint64ToBeBytes(envelope.BlockNumber), + "answer": map[string]interface{}{ + "data": data, + "timestamp": int64(envelope.Answer.Timestamp), + }, + "solana_chain_config": map[string]interface{}{ + "network_name": solanaConfig.NetworkName, + "network_id": solanaConfig.NetworkID, + "chain_id": solanaConfig.ChainID, + }, + "feed_config": map[string]interface{}{ + "feed_name": feedConfig.FeedName, + "feed_path": feedConfig.FeedPath, + "symbol": feedConfig.Symbol, + "heartbeat_sec": int64(feedConfig.HeartbeatSec), + "contract_type": feedConfig.ContractType, + "contract_status": feedConfig.ContractStatus, + "contract_address": feedConfig.ContractAddress.Bytes(), + "transmissions_account": feedConfig.TransmissionsAccount.Bytes(), + "state_account": feedConfig.StateAccount.Bytes(), + }, + } + return out, nil } // Helpers @@ -210,21 +223,19 @@ func uint64ToBeBytes(input uint64) []byte { } func extractSigners(oracles pkgSolana.Oracles) []interface{} { - out := []interface{}{} + out := make([]interface{}, oracles.Len) var i uint64 for i = 0; i < oracles.Len; i++ { - oracle := oracles.Raw[i] - out = append(out, oracle.Signer.Key[:]) + out[i] = oracles.Raw[i].Signer.Key[:] } return out } func extractTransmitters(oracles pkgSolana.Oracles) []interface{} { - out := []interface{}{} + out := make([]interface{}, oracles.Len) var i uint64 for i = 0; i < oracles.Len; i++ { - oracle := oracles.Raw[i] - out = append(out, oracle.Transmitter.Bytes()) + out[i] = oracles.Raw[i].Transmitter.Bytes() } return out } @@ -242,60 +253,55 @@ func parseNumericalMedianOffchainConfig(buf []byte) (*pb.NumericalMedianConfigPr } func formatOracles(oracles pkgSolana.Oracles) []interface{} { - out := []interface{}{} + out := make([]interface{}, oracles.Len) var i uint64 for i = 0; i < oracles.Len; i++ { - oracle := oracles.Raw[i] - out = append(out, map[string]interface{}{ - "transmitter": oracle.Transmitter[:], + out[i] = map[string]interface{}{ + "transmitter": oracles.Raw[i].Transmitter[:], "signer": map[string]interface{}{ - "key": oracle.Signer.Key[:], + "key": oracles.Raw[i].Signer.Key[:], }, - "payee": oracle.Payee[:], - "from_round_id": int64(oracle.FromRoundID), - "payment": uint64ToBeBytes(oracle.Payment), - }) + "payee": oracles.Raw[i].Payee[:], + "from_round_id": int64(oracles.Raw[i].FromRoundID), + "payment": uint64ToBeBytes(oracles.Raw[i].Payment), + } } return out } func formatLeftovers(leftovers pkgSolana.LeftoverPayments) []interface{} { - out := []interface{}{} + out := make([]interface{}, leftovers.Len) var i uint64 for i = 0; i < leftovers.Len; i++ { - leftover := leftovers.Raw[i] - out = append(out, map[string]interface{}{ - "payee": leftover.Payee[:], - "amount": uint64ToBeBytes(leftover.Amount), - }) + out[i] = map[string]interface{}{ + "payee": leftovers.Raw[i].Payee[:], + "amount": uint64ToBeBytes(leftovers.Raw[i].Amount), + } } return out } -func int32ArrToInt64Arr(in []uint32) []int64 { - out := []int64{} - for _, i := range in { - out = append(out, int64(i)) +func int32ArrToInt64Arr(xs []uint32) []int64 { + out := make([]int64, len(xs)) + for i, x := range xs { + out[i] = int64(x) } return out } -func createConfigSetSimplifiedOracles(offchainPublicKeys [][]byte, peerId []string, oracles pkgSolana.Oracles) ([]byte, error) { - if len(offchainPublicKeys) != len(peerId) && oracles.Len != uint64(len(peerId)) { - return nil, fmt.Errorf("length missmatch len(offchainPublicKeys)=%d , oracles.Len=%d, len(peerId)=%d", len(offchainPublicKeys), oracles.Len, len(peerId)) +func createConfigSetSimplifiedOracles(offchainPublicKeys [][]byte, peerIDs []string, oracles pkgSolana.Oracles) ([]byte, error) { + if len(offchainPublicKeys) != len(peerIDs) && oracles.Len != uint64(len(peerIDs)) { + return nil, fmt.Errorf("length missmatch len(offchainPublicKeys)=%d , oracles.Len=%d, len(peerIDs)=%d", len(offchainPublicKeys), oracles.Len, len(peerIDs)) } - var out []interface{} + out := make([]interface{}, oracles.Len) var i uint64 for i = 0; i < oracles.Len; i++ { - out = append(out, map[string]interface{}{ + out[i] = map[string]interface{}{ "transmitter": oracles.Raw[i].Transmitter, - "peer_id": peerId[i], + "peer_id": peerIDs[i], "offchain_public_key": offchainPublicKeys[i], - }) + } } s, err := json.Marshal(out) - if err != nil { - return nil, fmt.Errorf("failed to parse oracles: %w", err) - } - return s, nil + return s, err } diff --git a/pkg/monitoring/mapping_test.go b/pkg/monitoring/mapping_test.go index 40b77beee..ff6277832 100644 --- a/pkg/monitoring/mapping_test.go +++ b/pkg/monitoring/mapping_test.go @@ -1,11 +1,13 @@ package monitoring import ( + "encoding/base64" "encoding/json" "math/rand" "strings" "testing" + "github.com/mr-tron/base58" "github.com/stretchr/testify/require" ) @@ -14,8 +16,8 @@ func TestMapping(t *testing.T) { state, offchainConfig, numericalMedianOffchainConfig, err := generateState() require.NoError(t, err) envelope := StateEnvelope{ - State: state, - BlockNumber: rand.Uint64(), + state, // State: + rand.Uint64(), // BlockNumber: } solanaConfig := generateSolanaConfig() feedConfig := generateFeedConfig() @@ -178,6 +180,44 @@ func TestMapping(t *testing.T) { require.Equal(t, decodedFeedConfig["state_account"], feedConfig.StateAccount.Bytes()) }) + t.Run("MakeSimplifiedConfigSetMapping", func(t *testing.T) { + state, offchainConfig, _, err := generateState() + envelope := StateEnvelope{ + state, // State: + rand.Uint64(), // BlockNumber: + } + feedConfig := generateFeedConfig() + + mapping, err := MakeConfigSetSimplifiedMapping(envelope, feedConfig) + require.NoError(t, err) + var output []byte + serialized, err := configSetSimplifiedCodec.BinaryFromNative(output, mapping) + require.NoError(t, err) + deserialized, _, err := configSetSimplifiedCodec.NativeFromBinary(serialized) + require.NoError(t, err) + + configSetSimplified, ok := deserialized.(map[string]interface{}) + require.True(t, ok) + + oracles, err := createConfigSetSimplifiedOracles(offchainConfig.OffchainPublicKeys, offchainConfig.PeerIds, state.Oracles) + require.NoError(t, err) + + require.Equal(t, configSetSimplified["config_digest"], base64.StdEncoding.EncodeToString(state.Config.LatestConfigDigest[:])) + require.Equal(t, configSetSimplified["block_number"], uint64ToBeBytes(envelope.BlockNumber)) + require.Equal(t, configSetSimplified["delta_progress"], uint64ToBeBytes(offchainConfig.DeltaProgressNanoseconds)) + require.Equal(t, configSetSimplified["delta_resend"], uint64ToBeBytes(offchainConfig.DeltaResendNanoseconds)) + require.Equal(t, configSetSimplified["delta_round"], uint64ToBeBytes(offchainConfig.DeltaRoundNanoseconds)) + require.Equal(t, configSetSimplified["delta_grace"], uint64ToBeBytes(offchainConfig.DeltaGraceNanoseconds)) + require.Equal(t, configSetSimplified["delta_stage"], uint64ToBeBytes(offchainConfig.DeltaStageNanoseconds)) + require.Equal(t, configSetSimplified["r_max"], int64(offchainConfig.RMax)) + require.Equal(t, configSetSimplified["f"], int32(state.Config.F)) + require.Equal(t, configSetSimplified["signers"], jsonMarshalToString(t, extractSigners(state.Oracles))) + require.Equal(t, configSetSimplified["transmitters"], jsonMarshalToString(t, extractTransmitters(state.Oracles))) + require.Equal(t, configSetSimplified["s"], jsonMarshalToString(t, offchainConfig.S)) + require.Equal(t, configSetSimplified["oracles"], string(oracles)) + require.Equal(t, configSetSimplified["feed_state_account"], strings.Replace(jsonMarshalToString(t, base58.Encode(feedConfig.StateAccount.Bytes())), "\"", "", -1)) + }) + t.Run("MakeTransmissionMapping", func(t *testing.T) { initial := generateTransmissionEnvelope() solanaConfig := generateSolanaConfig() @@ -218,42 +258,6 @@ func TestMapping(t *testing.T) { require.Equal(t, decodedFeedConfig["transmissions_account"], feedConfig.TransmissionsAccount.Bytes()) require.Equal(t, decodedFeedConfig["state_account"], feedConfig.StateAccount.Bytes()) }) - t.Run("MakeSimplifiedConfigSetMapping", func(t *testing.T) { - state, offchainConfig, _, err := generateState() - envelope := StateEnvelope{ - State: state, - BlockNumber: rand.Uint64(), - } - feedConfig := generateFeedConfig() - - mapping, err := MakeSimplifiedConfigSetMapping(envelope, feedConfig) - require.NoError(t, err) - var output []byte - serialized, err := configSetSimplifiedCodec.BinaryFromNative(output, mapping) - require.NoError(t, err) - deserialized, _, err := configSetSimplifiedCodec.NativeFromBinary(serialized) - require.NoError(t, err) - - configSetSimplified, ok := deserialized.(map[string]interface{}) - require.True(t, ok) - - oracles, err := createConfigSetSimplifiedOracles(offchainConfig.OffchainPublicKeys, offchainConfig.PeerIds, state.Oracles) - require.NoError(t, err) - require.Equal(t, configSetSimplified["block_number"], uint64ToBeBytes(envelope.BlockNumber)) - require.Equal(t, configSetSimplified["delta_progress"], uint64ToBeBytes(offchainConfig.DeltaProgressNanoseconds)) - require.Equal(t, configSetSimplified["delta_resend"], uint64ToBeBytes(offchainConfig.DeltaResendNanoseconds)) - require.Equal(t, configSetSimplified["delta_round"], uint64ToBeBytes(offchainConfig.DeltaRoundNanoseconds)) - require.Equal(t, configSetSimplified["delta_grace"], uint64ToBeBytes(offchainConfig.DeltaGraceNanoseconds)) - require.Equal(t, configSetSimplified["delta_stage"], uint64ToBeBytes(offchainConfig.DeltaStageNanoseconds)) - require.Equal(t, configSetSimplified["r_max"], int64(offchainConfig.RMax)) - require.Equal(t, configSetSimplified["f"], int32(state.Config.F)) - require.Equal(t, configSetSimplified["signers"], jsonMarshalToString(t, extractSigners(state.Oracles))) - require.Equal(t, configSetSimplified["transmitters"], jsonMarshalToString(t, extractTransmitters(state.Oracles))) - require.Equal(t, configSetSimplified["s"], jsonMarshalToString(t, offchainConfig.S)) - require.Equal(t, configSetSimplified["oracles"], string(oracles)) - require.Equal(t, configSetSimplified["feed_state_account"], strings.Replace(jsonMarshalToString(t, feedConfig.StateAccount.Bytes()), "\"", "", -1)) - - }) } // Helpers diff --git a/pkg/monitoring/metrics.go b/pkg/monitoring/metrics.go index ae9e21c36..985581f5a 100644 --- a/pkg/monitoring/metrics.go +++ b/pkg/monitoring/metrics.go @@ -8,70 +8,63 @@ import ( type Metrics interface { SetHeadTrackerCurrentHead(blockNumber uint64, networkName, chainID, networkID string) - SetFeedContractMetadata(chainID, contractAddress, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol string) + SetFeedContractMetadata(chainID, contractAddress, feedID, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol string) SetNodeMetadata(chainID, networkID, networkName, oracleName, sender string) - SetOffchainAggregatorAnswers(answer *big.Int, contractAddress, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) - IncOffchainAggregatorAnswersTotal(contractAddress, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) - SetOffchainAggregatorSubmissionReceivedValues(value *big.Int, contractAddress, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) - SetOffchainAggregatorAnswerStalled(isSet bool, contractAddress, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) + SetOffchainAggregatorAnswers(answer *big.Int, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) + IncOffchainAggregatorAnswersTotal(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) + SetOffchainAggregatorSubmissionReceivedValues(value *big.Int, contractAddress, feedID, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) + SetOffchainAggregatorAnswerStalled(isSet bool, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) } var ( headTrackerCurrentHead = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "head_tracker_current_head", - Namespace: "solana", - Help: "Tracks the current block height that the monitoring instance has processed.", + Name: "head_tracker_current_head", + Help: "Tracks the current block height that the monitoring instance has processed.", }, []string{"network_name", "chain_id", "network_id"}, ) feedContractMetadata = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "feed_contract_metadata", - Namespace: "solana", - Help: "Exposes metadata for individual feeds. It should simply be set to 1, as the relevant info is in the labels.", + Name: "feed_contract_metadata", + Help: "Exposes metadata for individual feeds. It should simply be set to 1, as the relevant info is in the labels.", }, - []string{"chain_id", "contract_address", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "network_name", "symbol"}, + []string{"chain_id", "contract_address", "feed_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "network_name", "symbol"}, ) nodeMetadata = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "node_metadata", - Namespace: "solana", - Help: "Exposes metadata for node operators. It should simply be set to 1, as the relevant info is in the labels.", + Name: "node_metadata", + Help: "Exposes metadata for node operators. It should simply be set to 1, as the relevant info is in the labels.", }, []string{"chain_id", "network_id", "network_name", "oracle_name", "sender"}, ) offchainAggregatorAnswers = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "offchain_aggregator_answers", - Namespace: "solana", - Help: "Reports the latest answer for a contract.", + Name: "offchain_aggregator_answers", + Help: "Reports the latest answer for a contract.", }, - []string{"contract_address", "chain_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "network_name"}, + []string{"contract_address", "feed_id", "chain_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "network_name"}, ) offchainAggregatorAnswersTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "offchain_aggregator_answers_total", - Namespace: "solana", - Help: "Bump this metric every time there is a transmission on chain.", + Name: "offchain_aggregator_answers_total", + Help: "Bump this metric every time there is a transmission on chain.", }, - []string{"contract_address", "chain_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "networks_name"}, + []string{"contract_address", "feed_id", "chain_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "networks_name"}, ) offchainAggregatorSubmissionReceivedValues = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "offchain_aggregator_submission_received_values", - Namespace: "solana", - Help: "Report individual node observations for the latest transmission on chain. (Should be 1 time series per node per contract)", + Name: "offchain_aggregator_submission_received_values", + Help: "Report individual node observations for the latest transmission on chain. (Should be 1 time series per node per contract)", }, - []string{"contract_address", "sender", "chain_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "network_name"}, + []string{"contract_address", "feed_id", "sender", "chain_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "network_name"}, ) offchainAggregatorAnswerStalled = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "offchain_aggregator_answer_stalled", - Namespace: "Solana", - Help: "Set to 1 if the heartbeat interval has passed on a feed without a transmission. Set to 0 otherwise.", + Name: "offchain_aggregator_answer_stalled", + Help: "Set to 1 if the heartbeat interval has passed on a feed without a transmission. Set to 0 otherwise.", }, - []string{"contract_address", "chain_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "network_name"}, + []string{"contract_address", "feed_id", "chain_id", "contract_status", "contract_type", "feed_name", "feed_path", "network_id", "network_name"}, ) ) @@ -95,30 +88,30 @@ func (d *defaultMetrics) SetHeadTrackerCurrentHead(blockNumber uint64, networkNa headTrackerCurrentHead.WithLabelValues(networkName, chainID, networkID).Set(float64(blockNumber)) } -func (d *defaultMetrics) SetFeedContractMetadata(chainID, contractAddress, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol string) { - feedContractMetadata.WithLabelValues(chainID, contractAddress, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol).Set(1) +func (d *defaultMetrics) SetFeedContractMetadata(chainID, contractAddress, feedID, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol string) { + feedContractMetadata.WithLabelValues(chainID, contractAddress, feedID, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol).Set(1) } func (d *defaultMetrics) SetNodeMetadata(chainID, networkID, networkName, oracleName, sender string) { nodeMetadata.WithLabelValues(chainID, networkID, networkName, oracleName, sender).Set(1) } -func (d *defaultMetrics) SetOffchainAggregatorAnswers(answer *big.Int, contractAddress, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { - offchainAggregatorAnswers.WithLabelValues(contractAddress, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName).Set(float64(answer.Int64())) +func (d *defaultMetrics) SetOffchainAggregatorAnswers(answer *big.Int, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { + offchainAggregatorAnswers.WithLabelValues(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName).Set(float64(answer.Int64())) } -func (d *defaultMetrics) IncOffchainAggregatorAnswersTotal(contractAddress, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { - offchainAggregatorAnswersTotal.WithLabelValues(contractAddress, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName).Inc() +func (d *defaultMetrics) IncOffchainAggregatorAnswersTotal(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { + offchainAggregatorAnswersTotal.WithLabelValues(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName).Inc() } -func (d *defaultMetrics) SetOffchainAggregatorSubmissionReceivedValues(value *big.Int, contractAddress, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { - offchainAggregatorSubmissionReceivedValues.WithLabelValues(contractAddress, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName).Set(float64(value.Int64())) +func (d *defaultMetrics) SetOffchainAggregatorSubmissionReceivedValues(value *big.Int, contractAddress, feedID, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { + offchainAggregatorSubmissionReceivedValues.WithLabelValues(contractAddress, feedID, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName).Set(float64(value.Int64())) } -func (d *defaultMetrics) SetOffchainAggregatorAnswerStalled(isSet bool, contractAddress, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { +func (d *defaultMetrics) SetOffchainAggregatorAnswerStalled(isSet bool, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { var value float64 = 0 if isSet { value = 1 } - offchainAggregatorAnswerStalled.WithLabelValues(contractAddress, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName).Set(value) + offchainAggregatorAnswerStalled.WithLabelValues(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName).Set(value) } diff --git a/pkg/monitoring/multi_feed_monitor.go b/pkg/monitoring/multi_feed_monitor.go index 2bdd000e4..0f45df4ac 100644 --- a/pkg/monitoring/multi_feed_monitor.go +++ b/pkg/monitoring/multi_feed_monitor.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" "github.com/smartcontractkit/chainlink/core/logger" ) @@ -12,56 +13,88 @@ type MultiFeedMonitor interface { } func NewMultiFeedMonitor( + solanaConfig config.Solana, + feeds []config.Feed, + log logger.Logger, - config Config, transmissionReader, stateReader AccountReader, - transmissionSchema, stateSchema, configSetSimplifiedSchema Schema, producer Producer, metrics Metrics, + + configSetTopic string, + configSetSimplifiedTopic string, + transmissionTopic string, + + configSetSchema Schema, + configSetSimplifiedSchema Schema, + transmissionSchema Schema, ) MultiFeedMonitor { return &multiFeedMonitor{ + solanaConfig, + feeds, + log, - config, transmissionReader, stateReader, - transmissionSchema, stateSchema, configSetSimplifiedSchema, producer, metrics, + + configSetTopic, + configSetSimplifiedTopic, + transmissionTopic, + + configSetSchema, + configSetSimplifiedSchema, + transmissionSchema, } } type multiFeedMonitor struct { - log logger.Logger - config Config - transmissionReader AccountReader - stateReader AccountReader - transmissionSchema Schema - stateSchema Schema + solanaConfig config.Solana + feeds []config.Feed + + log logger.Logger + transmissionReader AccountReader + stateReader AccountReader + producer Producer + metrics Metrics + + configSetTopic string + configSetSimplifiedTopic string + transmissionTopic string + + configSetSchema Schema configSetSimplifiedSchema Schema - producer Producer - metrics Metrics + transmissionSchema Schema } const bufferCapacity = 100 // Start should be executed as a goroutine. func (m *multiFeedMonitor) Start(ctx context.Context, wg *sync.WaitGroup) { - wg.Add(len(m.config.Feeds)) - for _, feedConfig := range m.config.Feeds { - go func(feedConfig FeedConfig) { + wg.Add(len(m.feeds)) + for _, feedConfig := range m.feeds { + go func(feedConfig config.Feed) { defer wg.Done() + feedLogger := m.log.With( + "feed", feedConfig.FeedName, + "network", m.solanaConfig.NetworkName, + ) + transmissionPoller := NewPoller( - m.log.With("account", "transmissions", "address", feedConfig.TransmissionsAccount.String()), + feedLogger.With("component", "transmissions-poller", "address", feedConfig.TransmissionsAccount.String()), feedConfig.TransmissionsAccount, m.transmissionReader, - feedConfig.PollInterval, + m.solanaConfig.PollInterval, + m.solanaConfig.ReadTimeout, bufferCapacity, ) statePoller := NewPoller( - m.log.With("account", "state", "address", feedConfig.StateAccount.String()), + feedLogger.With("component", "state-poller", "address", feedConfig.StateAccount.String()), feedConfig.StateAccount, m.stateReader, - feedConfig.PollInterval, + m.solanaConfig.PollInterval, + m.solanaConfig.ReadTimeout, bufferCapacity, ) @@ -75,16 +108,35 @@ func (m *multiFeedMonitor) Start(ctx context.Context, wg *sync.WaitGroup) { statePoller.Start(ctx) }() + exporters := []Exporter{ + NewPrometheusExporter( + m.solanaConfig, + feedConfig, + feedLogger.With("component", "prometheus-exporter"), + m.metrics, + ), + NewKafkaExporter( + m.solanaConfig, + feedConfig, + feedLogger.With("component", "kafka-exporter"), + m.producer, + + m.configSetSchema, + m.configSetSimplifiedSchema, + m.transmissionSchema, + + m.configSetTopic, + m.configSetSimplifiedTopic, + m.transmissionTopic, + ), + } + feedMonitor := NewFeedMonitor( - m.log.With("name", feedConfig.FeedName, "network", m.config.Solana.NetworkName), - m.config, - feedConfig, + feedLogger.With("component", "feed-monitor"), transmissionPoller, statePoller, - m.transmissionSchema, m.stateSchema, m.configSetSimplifiedSchema, - m.producer, - m.metrics, + exporters, ) - feedMonitor.Start(ctx) + feedMonitor.Start(ctx, wg) }(feedConfig) } } diff --git a/pkg/monitoring/multi_feed_monitor_test.go b/pkg/monitoring/multi_feed_monitor_test.go index f80a1340f..96ad9633c 100644 --- a/pkg/monitoring/multi_feed_monitor_test.go +++ b/pkg/monitoring/multi_feed_monitor_test.go @@ -6,40 +6,50 @@ import ( "testing" "time" + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" "github.com/smartcontractkit/chainlink/core/logger" "github.com/stretchr/testify/require" ) const numFeeds = 10 -func TestMultiFeedMonitor(t *testing.T) { +func TestMultiFeedMonitorToMakeSureAllGoroutinesTerminate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() wg := &sync.WaitGroup{} - cfg := Config{} + cfg := config.Config{} + cfg.Solana.PollInterval = 5 * time.Second + feeds := []config.Feed{} for i := 0; i < numFeeds; i++ { - feedConfig := generateFeedConfig() - feedConfig.PollInterval = 5 * time.Second - cfg.Feeds = append(cfg.Feeds, feedConfig) + feeds = append(feeds, generateFeedConfig()) } - transmissionSchema := fakeSchema{transmissionCodec} - stateSchema := fakeSchema{configSetCodec} + configSetSchema := fakeSchema{configSetCodec} configSetSimplifiedSchema := fakeSchema{configSetSimplifiedCodec} + transmissionSchema := fakeSchema{transmissionCodec} - producer := fakeProducer{make(chan producerMessage)} + producer := fakeProducer{make(chan producerMessage), ctx} transmissionReader := &fakeReader{make(chan interface{})} stateReader := &fakeReader{make(chan interface{})} monitor := NewMultiFeedMonitor( + cfg.Solana, + feeds, + logger.NewNullLogger(), - cfg, transmissionReader, stateReader, - transmissionSchema, stateSchema, configSetSimplifiedSchema, producer, &devnullMetrics{}, + + cfg.Kafka.ConfigSetTopic, + cfg.Kafka.ConfigSetSimplifiedTopic, + cfg.Kafka.TransmissionTopic, + + configSetSchema, + configSetSimplifiedSchema, + transmissionSchema, ) go monitor.Start(ctx, wg) @@ -54,6 +64,10 @@ LOOP: trCount += 1 case stateReader.readCh <- StateEnvelope{newState, 100}: stCount += 1 + case <-ctx.Done(): + break LOOP + } + select { case message := <-producer.sendCh: messages = append(messages, message) case <-ctx.Done(): @@ -62,7 +76,87 @@ LOOP: } wg.Wait() - require.Equal(t, trCount, 10, "should only be able to do initial read of the latest transmission") - require.Equal(t, stCount, 10, "should only be able to do initial read of the state account") - require.Equal(t, len(messages), 30) + require.Equal(t, 10, trCount, "should only be able to do initial read of the latest transmission") + require.Equal(t, 10, stCount, "should only be able to do initial read of the state account") + require.Equal(t, 20, len(messages)) +} + +func TestMultiFeedMonitorForPerformance(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + wg := &sync.WaitGroup{} + + cfg := config.Config{} + cfg.Solana.PollInterval = 5 * time.Second + feeds := []config.Feed{} + for i := 0; i < numFeeds; i++ { + feeds = append(feeds, generateFeedConfig()) + } + + configSetSchema := fakeSchema{configSetCodec} + configSetSimplifiedSchema := fakeSchema{configSetSimplifiedCodec} + transmissionSchema := fakeSchema{transmissionCodec} + + producer := fakeProducer{make(chan producerMessage), ctx} + + transmissionReader := &fakeReader{make(chan interface{})} + stateReader := &fakeReader{make(chan interface{})} + + monitor := NewMultiFeedMonitor( + cfg.Solana, + feeds, + + logger.NewNullLogger(), + transmissionReader, stateReader, + producer, + &devnullMetrics{}, + + cfg.Kafka.ConfigSetTopic, + cfg.Kafka.ConfigSetSimplifiedTopic, + cfg.Kafka.TransmissionTopic, + + configSetSchema, + configSetSimplifiedSchema, + transmissionSchema, + ) + go monitor.Start(ctx, wg) + + trCount, stCount := 0, 0 + messages := []producerMessage{} + + wg.Add(1) + go func() { + defer wg.Done() + LOOP: + for { + newState, _, _, err := generateState() + require.NoError(t, err) + select { + case transmissionReader.readCh <- generateTransmissionEnvelope(): + trCount += 1 + case stateReader.readCh <- StateEnvelope{newState, 100}: + stCount += 1 + case <-ctx.Done(): + break LOOP + } + } + }() + wg.Add(1) + go func() { + defer wg.Done() + LOOP: + for { + select { + case message := <-producer.sendCh: + messages = append(messages, message) + case <-ctx.Done(): + break LOOP + } + } + }() + + wg.Wait() + require.Equal(t, 10, trCount, "should only be able to do initial read of the latest transmission") + require.Equal(t, 10, stCount, "should only be able to do initial read of the state account") + require.Equal(t, 30, len(messages)) } diff --git a/pkg/monitoring/poller.go b/pkg/monitoring/poller.go index b8d60f8cc..a700e9537 100644 --- a/pkg/monitoring/poller.go +++ b/pkg/monitoring/poller.go @@ -17,14 +17,16 @@ func NewPoller( log logger.Logger, account solana.PublicKey, reader AccountReader, - fetchInterval time.Duration, + pollInterval time.Duration, + readTimeout time.Duration, bufferCapacity uint32, ) Poller { return &solanaPollerImpl{ log, account, reader, - fetchInterval, + pollInterval, + readTimeout, bufferCapacity, make(chan interface{}, bufferCapacity), } @@ -34,7 +36,8 @@ type solanaPollerImpl struct { log logger.Logger account solana.PublicKey reader AccountReader - fetchInterval time.Duration + pollInterval time.Duration + readTimeout time.Duration bufferCapacity uint32 updates chan interface{} } @@ -50,19 +53,26 @@ func (s *solanaPollerImpl) Start(ctx context.Context) { s.updates <- data } + reusedTimer := time.NewTimer(s.pollInterval) for { - timer := time.NewTimer(s.fetchInterval) select { - case <-timer.C: - data, err := s.reader.Read(ctx, s.account) + case <-reusedTimer.C: + var data interface{} + var err error + func() { + ctx, cancel := context.WithTimeout(ctx, s.readTimeout) + defer cancel() + data, err = s.reader.Read(ctx, s.account) + }() if err != nil { s.log.Errorw("failed to read account contents", "error", err) continue } s.updates <- data + reusedTimer.Reset(s.pollInterval) case <-ctx.Done(): - if !timer.Stop() { - <-timer.C + if !reusedTimer.Stop() { + <-reusedTimer.C } s.log.Debug("poller closed") return diff --git a/pkg/monitoring/poller_test.go b/pkg/monitoring/poller_test.go index 5a639d942..43aa6b736 100644 --- a/pkg/monitoring/poller_test.go +++ b/pkg/monitoring/poller_test.go @@ -16,7 +16,8 @@ func TestPoller(t *testing.T) { name string duration time.Duration waitOnRead time.Duration - fetchInterval time.Duration + pollInterval time.Duration + readTimeout time.Duration processingTime time.Duration bufferCapacity uint32 countLower int @@ -27,6 +28,7 @@ func TestPoller(t *testing.T) { 1 * time.Second, 100 * time.Millisecond, 100 * time.Millisecond, + 100 * time.Millisecond, 0, 0, 4, @@ -37,16 +39,18 @@ func TestPoller(t *testing.T) { 1 * time.Second, 300 * time.Millisecond, 10 * time.Millisecond, + 10 * time.Millisecond, 0, 0, - 3, - 4, + 28, + 35, }, { "fast fetch, fast polling, insufficient buffering, tons of backpressure", 1 * time.Second, 10 * time.Millisecond, // Producer will make 1000/(10+10)=50 messages in a second. 10 * time.Millisecond, + 10 * time.Millisecond, 200 * time.Millisecond, // time it gets the "consumer" to process a message. It will only be able to process 1000/200=5 updates per second. 5, 4, @@ -57,7 +61,12 @@ func TestPoller(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testCase.duration) defer cancel() reader := fakeReaderWithWait{testCase.waitOnRead} - poller := NewPoller(logger.NewNullLogger(), account, reader, testCase.fetchInterval, testCase.bufferCapacity) + poller := NewPoller( + logger.NewNullLogger(), + account, reader, + testCase.pollInterval, + testCase.readTimeout, + testCase.bufferCapacity) go poller.Start(ctx) readCount := 0 diff --git a/pkg/monitoring/producer.go b/pkg/monitoring/producer.go index 6b940cf63..2955d4385 100644 --- a/pkg/monitoring/producer.go +++ b/pkg/monitoring/producer.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" "github.com/smartcontractkit/chainlink/core/logger" ) @@ -17,10 +18,10 @@ type producer struct { log logger.Logger backend *kafka.Producer deliveryChan chan kafka.Event - cfg KafkaConfig + cfg config.Kafka } -func NewProducer(ctx context.Context, log logger.Logger, cfg KafkaConfig) (Producer, error) { +func NewProducer(ctx context.Context, log logger.Logger, cfg config.Kafka) (Producer, error) { backend, err := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": cfg.Brokers, "client.id": cfg.ClientID, @@ -56,7 +57,6 @@ func (p *producer) run(ctx context.Context) { } func (p *producer) Produce(key, value []byte, topic string) error { - return p.backend.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &topic, diff --git a/pkg/monitoring/schema.go b/pkg/monitoring/schema.go new file mode 100644 index 000000000..7af0a14c6 --- /dev/null +++ b/pkg/monitoring/schema.go @@ -0,0 +1,63 @@ +package monitoring + +import ( + "encoding/binary" + "fmt" + + "github.com/riferrei/srclient" +) + +type Schema interface { + ID() int + Version() int + Subject() string + Encode(interface{}) ([]byte, error) + Decode([]byte) (interface{}, error) +} + +type wrapSchema struct { + subject string + *srclient.Schema +} + +func (w wrapSchema) ID() int { + return w.Schema.ID() +} +func (w wrapSchema) Version() int { + return w.Schema.Version() +} + +func (w wrapSchema) Subject() string { + return w.subject +} + +func (w wrapSchema) Encode(value interface{}) ([]byte, error) { + payload, err := w.Schema.Codec().BinaryFromNative(nil, value) + if err != nil { + return nil, fmt.Errorf("failed to encode value in avro: %w", err) + } + schemaIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(schemaIDBytes, uint32(w.Schema.ID())) + + // Magic 0 byte + 4 bytes of schema ID + the data bytes + bytes := []byte{0} + bytes = append(bytes, schemaIDBytes...) + bytes = append(bytes, payload...) + return bytes, nil +} + +func (w wrapSchema) Decode(buf []byte) (interface{}, error) { + if buf[0] != 0 { + return nil, fmt.Errorf("magic byte not 0, instead is %d", buf[0]) + } + schemaID := int(binary.BigEndian.Uint32(buf[1:5])) + if schemaID != w.ID() { + return nil, fmt.Errorf("decoding message for a different schema, found schema id is %d but expected %d", schemaID, w.ID()) + } + value, _, err := w.Schema.Codec().NativeFromBinary(buf[5:]) + return value, err +} + +func (w wrapSchema) String() string { + return fmt.Sprintf("schema(subject=%s,id=%d,version=%d)", w.subject, w.Schema.ID(), w.Schema.Version()) +} diff --git a/pkg/monitoring/schema_registry.go b/pkg/monitoring/schema_registry.go index 1b2d1478c..7dcb06184 100644 --- a/pkg/monitoring/schema_registry.go +++ b/pkg/monitoring/schema_registry.go @@ -1,12 +1,12 @@ package monitoring import ( - "encoding/binary" "encoding/json" "fmt" "strings" "github.com/riferrei/srclient" + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" "github.com/smartcontractkit/chainlink/core/logger" "github.com/stretchr/testify/assert" ) @@ -20,11 +20,11 @@ type SchemaRegistry interface { } type schemaRegistry struct { - backend *srclient.SchemaRegistryClient + backend srclient.ISchemaRegistryClient log logger.Logger } -func NewSchemaRegistry(cfg SchemaRegistryConfig, log logger.Logger) SchemaRegistry { +func NewSchemaRegistry(cfg config.SchemaRegistry, log logger.Logger) SchemaRegistry { backend := srclient.CreateSchemaRegistryClient(cfg.URL) if cfg.Username != "" && cfg.Password != "" { backend.SetCredentials(cfg.Username, cfg.Password) @@ -43,7 +43,7 @@ func (s *schemaRegistry) EnsureSchema(subject, spec string) (Schema, error) { if err != nil { return nil, fmt.Errorf("unable to create new schema with subject '%s': %w", subject, err) } - return wrapSchema{newSchema}, nil + return wrapSchema{subject, newSchema}, nil } isEqualSchemas, errInIsEqualJSON := isEqualJSON(registeredSchema.Schema(), spec) if errInIsEqualJSON != nil { @@ -51,44 +51,14 @@ func (s *schemaRegistry) EnsureSchema(subject, spec string) (Schema, error) { } if isEqualSchemas { s.log.Infof("using existing schema for subject '%s'\n", subject) - return wrapSchema{registeredSchema}, nil + return wrapSchema{subject, registeredSchema}, nil } s.log.Infof("updating schema for subject '%s'\n", subject) newSchema, err := s.backend.CreateSchema(subject, spec, srclient.Avro) if err != nil { return nil, fmt.Errorf("unable to update schema with subject '%s': %w", subject, err) } - return wrapSchema{newSchema}, nil -} - -type Schema interface { - Encode(interface{}) ([]byte, error) - Decode([]byte) (interface{}, error) -} - -type wrapSchema struct { - *srclient.Schema -} - -func (w wrapSchema) Encode(value interface{}) ([]byte, error) { - payload, err := w.Schema.Codec().BinaryFromNative(nil, value) - if err != nil { - return nil, fmt.Errorf("failed to encode value in avro: %w", err) - } - schemaIDBytes := make([]byte, 4) - binary.BigEndian.PutUint32(schemaIDBytes, uint32(w.Schema.ID())) - - // Magic 0 byte + 4 bytes of schema ID + the data bytes - bytes := []byte{0} - bytes = append(bytes, schemaIDBytes...) - bytes = append(bytes, payload...) - return bytes, nil -} - -func (w wrapSchema) Decode(buf []byte) (interface{}, error) { - // TODO add the decode for tests later - value, _, err := w.Schema.Codec().NativeFromBinary(buf) - return value, err + return wrapSchema{subject, newSchema}, nil } // Helpers diff --git a/pkg/monitoring/schema_registry_test.go b/pkg/monitoring/schema_registry_test.go new file mode 100644 index 000000000..fece3957c --- /dev/null +++ b/pkg/monitoring/schema_registry_test.go @@ -0,0 +1,65 @@ +package monitoring + +import ( + "testing" + + "github.com/riferrei/srclient" + "github.com/smartcontractkit/chainlink/core/logger" + "github.com/stretchr/testify/require" +) + +const baseSchema = ` +{"name": "person", "type": "record", "fields": [ + {"name": "name", "type": "string"} +]}` + +const extendedSchema = ` +{"name": "person", "type": "record", "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"} +]}` + +func TestSchemaRegistry(t *testing.T) { + t.Run("EnsureSchema", func(t *testing.T) { + client := srclient.CreateMockSchemaRegistryClient("http://127.0.0.1:6767") + registry := &schemaRegistry{client, logger.NewNullLogger()} + + // Note: because the mock schema registry panics(!) upon calling GetLatestSchema() + // when querying for an inexistent subject, we can't test the case where the + // schema does not exist and is first created by EnsureSchema! + newSchema, err := client.CreateSchema("config_set", baseSchema, srclient.Avro) + require.NoError(t, err, "no error when creating the schema") + + existingSchema, err := registry.EnsureSchema("config_set", baseSchema) + require.NoError(t, err, "no error when fetching existing schema") + require.Equal(t, newSchema.ID(), existingSchema.ID(), "should return the same schema ID") + require.Equal(t, newSchema.Version(), existingSchema.Version(), "should return the same schema version") + + extendedSchema, err := registry.EnsureSchema("config_set", extendedSchema) + require.NoError(t, err, "no error when extending existing schema") + require.Equal(t, existingSchema.ID()+1, extendedSchema.ID(), "should bump the schema ID") + require.Equal(t, existingSchema.Version()+1, extendedSchema.Version(), "should bump the version after a schema update") + }) + t.Run("Encode/Decode", func(t *testing.T) { + client := srclient.CreateMockSchemaRegistryClient("http://127.0.0.1:6767") + registry := &schemaRegistry{client, logger.NewNullLogger()} + _, err := client.CreateSchema("person", baseSchema, srclient.Avro) + require.NoError(t, err) + schema, err := registry.EnsureSchema("person", baseSchema) + require.NoError(t, err) + + subject := map[string]interface{}{"name": "test"} + expectedEncoded := []byte{ + 0x0, // "magic" byte + 0x0, 0x0, 0x0, 0x1, // 4 bytes for schema id + 0x8, 0x74, 0x65, 0x73, 0x74, // avro-encoded payload + } + encoded, err := schema.Encode(subject) + require.NoError(t, err) + require.Equal(t, expectedEncoded, encoded) + + decoded, err := schema.Decode(encoded) + require.NoError(t, err) + require.Equal(t, subject, decoded) + }) +} diff --git a/pkg/monitoring/schemas_test.go b/pkg/monitoring/schemas_test.go new file mode 100644 index 000000000..273e2a140 --- /dev/null +++ b/pkg/monitoring/schemas_test.go @@ -0,0 +1,33 @@ +package monitoring + +import ( + "testing" + + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" + "github.com/stretchr/testify/require" +) + +func TestSchemas(t *testing.T) { + solanaConfig := config.Solana{} + feedConfig := config.Feed{} + transmission := TransmissionEnvelope{} + state := StateEnvelope{} + t.Run("encode an empty configSet message", func(t *testing.T) { + mapping, err := MakeConfigSetMapping(state, solanaConfig, feedConfig) + require.NoError(t, err) + _, err = configSetCodec.BinaryFromNative(nil, mapping) + require.NoError(t, err) + }) + t.Run("encode an empty configSetSimplified message", func(t *testing.T) { + mapping, err := MakeConfigSetSimplifiedMapping(state, feedConfig) + require.NoError(t, err) + _, err = configSetSimplifiedCodec.BinaryFromNative(nil, mapping) + require.NoError(t, err) + }) + t.Run("encode an empty transmission message", func(t *testing.T) { + mapping, err := MakeTransmissionMapping(transmission, solanaConfig, feedConfig) + require.NoError(t, err) + _, err = transmissionCodec.BinaryFromNative(nil, mapping) + require.NoError(t, err) + }) +} diff --git a/pkg/monitoring/testutils.go b/pkg/monitoring/testutils.go index 41703334a..072a8cef0 100644 --- a/pkg/monitoring/testutils.go +++ b/pkg/monitoring/testutils.go @@ -13,6 +13,7 @@ import ( gbinary "github.com/gagliardetto/binary" "github.com/gagliardetto/solana-go" "github.com/linkedin/goavro" + "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/config" "github.com/smartcontractkit/chainlink-solana/pkg/monitoring/pb" pkgSolana "github.com/smartcontractkit/chainlink-solana/pkg/solana" "github.com/smartcontractkit/chainlink/core/logger" @@ -55,16 +56,18 @@ func generateOffchainConfig(oracles [19]pkgSolana.Oracle, numOracles int) ( peerIDs = append(peerIDs, fmt.Sprintf("peer#%d", i)) } config := &pb.OffchainConfigProto{ - DeltaProgressNanoseconds: rand.Uint64(), - DeltaResendNanoseconds: rand.Uint64(), - DeltaRoundNanoseconds: rand.Uint64(), - DeltaGraceNanoseconds: rand.Uint64(), - DeltaStageNanoseconds: rand.Uint64(), - RMax: rand.Uint32(), - S: schedule, - OffchainPublicKeys: offchainPublicKeys, - PeerIds: peerIDs, - ReportingPluginConfig: encodedNumericalMedianOffchainConfig, + DeltaProgressNanoseconds: rand.Uint64(), + DeltaResendNanoseconds: rand.Uint64(), + DeltaRoundNanoseconds: rand.Uint64(), + DeltaGraceNanoseconds: rand.Uint64(), + DeltaStageNanoseconds: rand.Uint64(), + + RMax: rand.Uint32(), + S: schedule, + OffchainPublicKeys: offchainPublicKeys, + PeerIds: peerIDs, + ReportingPluginConfig: encodedNumericalMedianOffchainConfig, + MaxDurationQueryNanoseconds: rand.Uint64(), MaxDurationObservationNanoseconds: rand.Uint64(), MaxDurationReportNanoseconds: rand.Uint64(), @@ -168,19 +171,21 @@ func generateState() ( return state, offchainConfig, numericalMedianConfig, nil } -func generateSolanaConfig() SolanaConfig { - return SolanaConfig{ - RPCEndpoint: "http://solana:6969", - NetworkName: "solana-mainnet-beta", - NetworkID: "1", - ChainID: "solana-mainnet-beta", +func generateSolanaConfig() config.Solana { + return config.Solana{ + RPCEndpoint: "http://solana:6969", + NetworkName: "solana-mainnet-beta", + NetworkID: "1", + ChainID: "solana-mainnet-beta", + ReadTimeout: 100 * time.Millisecond, + PollInterval: time.Duration(1+rand.Intn(5)) * time.Second, } } -func generateFeedConfig() FeedConfig { +func generateFeedConfig() config.Feed { coins := []string{"btc", "eth", "matic", "link", "avax", "ftt", "srm", "usdc", "sol", "ray"} coin := coins[rand.Intn(len(coins))] - return FeedConfig{ + return config.Feed{ FeedName: fmt.Sprintf("%s / usd", coin), FeedPath: fmt.Sprintf("%s-usd", coin), Symbol: "$", @@ -191,8 +196,6 @@ func generateFeedConfig() FeedConfig { ContractAddress: generatePublicKey(), TransmissionsAccount: generatePublicKey(), StateAccount: generatePublicKey(), - - PollInterval: time.Duration(1+rand.Intn(5)) * time.Second, } } @@ -262,7 +265,11 @@ func NewRandomDataReader(ctx context.Context, wg *sync.WaitGroup, typ string, lo } func (f *fakeReader) Read(ctx context.Context, _ solana.PublicKey) (interface{}, error) { - ans := <-f.readCh + var ans interface{} + select { + case ans = <-f.readCh: + case <-ctx.Done(): + } return ans, nil } @@ -286,7 +293,7 @@ func (f *fakeReader) runRandomDataGenerator(ctx context.Context, typ string, log } select { case f.readCh <- payload: - log.Infof("sent payload of type %s", typ) + log.Infof("generate data for account of type %s", typ) case <-ctx.Done(): return } @@ -297,10 +304,14 @@ type producerMessage struct{ key, value []byte } type fakeProducer struct { sendCh chan producerMessage + ctx context.Context } func (f fakeProducer) Produce(key, value []byte, topic string) error { - f.sendCh <- producerMessage{key, value} + select { + case f.sendCh <- producerMessage{key, value}: + case <-f.ctx.Done(): + } return nil } @@ -308,6 +319,18 @@ type fakeSchema struct { codec *goavro.Codec } +func (f fakeSchema) ID() int { + return 1 +} + +func (f fakeSchema) Version() int { + return 1 +} + +func (f fakeSchema) Subject() string { + return "n/a" +} + func (f fakeSchema) Encode(value interface{}) ([]byte, error) { return f.codec.BinaryFromNative(nil, value) } @@ -324,22 +347,40 @@ var _ Metrics = (*devnullMetrics)(nil) func (d *devnullMetrics) SetHeadTrackerCurrentHead(blockNumber uint64, networkName, chainID, networkID string) { } -func (d *devnullMetrics) SetFeedContractMetadata(chainID, contractAddress, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol string) { +func (d *devnullMetrics) SetFeedContractMetadata(chainID, contractAddress, feedID, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol string) { } func (d *devnullMetrics) SetNodeMetadata(chainID, networkID, networkName, oracleName, sender string) { } -func (d *devnullMetrics) SetOffchainAggregatorAnswers(answer *big.Int, contractAddress, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { +func (d *devnullMetrics) SetOffchainAggregatorAnswers(answer *big.Int, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { } -func (d *devnullMetrics) IncOffchainAggregatorAnswersTotal(contractAddress, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { +func (d *devnullMetrics) IncOffchainAggregatorAnswersTotal(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { +} + +func (d *devnullMetrics) SetOffchainAggregatorSubmissionReceivedValues(value *big.Int, contractAddress, feedID, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { +} + +func (d *devnullMetrics) SetOffchainAggregatorAnswerStalled(isSet bool, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { +} + +type keepLatestMetrics struct { + *devnullMetrics + + latestTransmission *big.Int + latestTransmitter string } -func (d *devnullMetrics) SetOffchainAggregatorSubmissionReceivedValues(value *big.Int, contractAddress, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { +func (k *keepLatestMetrics) SetOffchainAggregatorAnswers(answer *big.Int, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { + k.latestTransmission = &big.Int{} + k.latestTransmission.Set(answer) } -func (d *devnullMetrics) SetOffchainAggregatorAnswerStalled(isSet bool, contractAddress, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { +func (k *keepLatestMetrics) SetOffchainAggregatorSubmissionReceivedValues(value *big.Int, contractAddress, feedID, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string) { + k.latestTransmission = &big.Int{} + k.latestTransmission.Set(value) + k.latestTransmitter = sender } // This utilities are used primarely in tests but are present in the monitoring package because they are not inside a file ending in _test.go. @@ -350,4 +391,5 @@ var ( _ = generateFeedConfig _ = fakeProducer{} _ = fakeSchema{} + _ = keepLatestMetrics{} )