Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Db syncing for registry syncer #13756

Merged
merged 76 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
5fcae4f
Adds migration for syncer state data
vyzaldysanchez Jul 4, 2024
f868290
Implements syncer ORM
vyzaldysanchez Jul 4, 2024
62280d1
Implements syncing with local registry and properly updating it
vyzaldysanchez Jul 4, 2024
0e2e504
Fixes db sync channel
vyzaldysanchez Jul 4, 2024
0b9256c
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Jul 4, 2024
4902d1e
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Jul 8, 2024
d17cb2b
Connects DB to syncer constructor
vyzaldysanchez Jul 8, 2024
14dabe2
Adds changeset
vyzaldysanchez Jul 8, 2024
cd6ce1f
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Jul 9, 2024
0762e38
Fixes changeset
vyzaldysanchez Jul 9, 2024
4279fb5
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Jul 9, 2024
fcaf52b
Uses custom marshal JSON call on DB store call
vyzaldysanchez Jul 9, 2024
7574557
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Jul 11, 2024
01f5018
Fixes errors from merge conflicts
vyzaldysanchez Jul 11, 2024
f4e9ecf
Fixes existing tests
vyzaldysanchez Jul 11, 2024
aae187a
Prevents tests from hanging
vyzaldysanchez Jul 11, 2024
e8443de
Fixes lint issue
vyzaldysanchez Jul 11, 2024
5dcff70
Fixes setup on `New()` call
vyzaldysanchez Jul 11, 2024
176d555
Implements marshal/unnmarshal mechanics on syncer state
vyzaldysanchez Jul 14, 2024
dc1969d
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Jul 14, 2024
19e7f49
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Jul 14, 2024
ad71ef9
Fixes migration name
vyzaldysanchez Jul 14, 2024
702b463
Fixes lint
vyzaldysanchez Jul 14, 2024
e00036e
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Jul 17, 2024
43c94a5
Keeps the latest 10 records on `registry_syncer_states`
vyzaldysanchez Jul 17, 2024
a7971dd
Adds ORM tests
vyzaldysanchez Jul 17, 2024
d04977e
Prevents possible flake
vyzaldysanchez Jul 17, 2024
615f6da
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Jul 17, 2024
9ed68fe
Merge remote-tracking branch 'origin/develop' into task/KS-297/db-syncer
vyzaldysanchez Jul 30, 2024
a311bae
Fixes errors from conflict
vyzaldysanchez Jul 30, 2024
523e691
Merge remote-tracking branch 'origin/develop' into task/KS-297/db-syncer
vyzaldysanchez Jul 30, 2024
7dda79c
Fixes syncer tests
vyzaldysanchez Jul 30, 2024
3e5bc5f
Fixes syncer ORM tests
vyzaldysanchez Jul 30, 2024
1b8f80b
Fixes linter
vyzaldysanchez Jul 30, 2024
6fcd67a
Fixes tests
vyzaldysanchez Jul 30, 2024
4df31ac
Fixes tests
vyzaldysanchez Jul 30, 2024
868aee4
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 1, 2024
8ec7011
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 6, 2024
a1c7e1e
Review changes
vyzaldysanchez Aug 6, 2024
1bea6cc
Fixes tests
vyzaldysanchez Aug 6, 2024
6995a56
Fixes lint
vyzaldysanchez Aug 6, 2024
0b5e932
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 6, 2024
4ee8b9b
Improves implementation
vyzaldysanchez Aug 6, 2024
93bd305
Removes unused `to32Byte` func
vyzaldysanchez Aug 6, 2024
1d1e4b6
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 6, 2024
0800c31
fixes errors on `Close()`
vyzaldysanchez Aug 6, 2024
0c3424e
Adds more custom types to avoid data races
vyzaldysanchez Aug 6, 2024
d61c935
Update tests
vyzaldysanchez Aug 6, 2024
a0eef06
fixes lint
vyzaldysanchez Aug 6, 2024
d657f9a
Removes unnecessary comments
vyzaldysanchez Aug 6, 2024
230c965
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 6, 2024
78ca549
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 6, 2024
5393ab2
Sends deep copy of local registry to launchers
vyzaldysanchez Aug 6, 2024
4a71dfd
Fixes linter
vyzaldysanchez Aug 6, 2024
51d98d6
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 6, 2024
975f087
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 7, 2024
4b80e9b
Uses interface for syncer ORM
vyzaldysanchez Aug 7, 2024
524680d
Mocks the ORM for tests
vyzaldysanchez Aug 8, 2024
b21a2aa
Improves `syncer.Close()` mechanism
vyzaldysanchez Aug 8, 2024
c2581bd
Merge remote-tracking branch 'origin/develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 8, 2024
bbac20c
Fixes merge conflicts
vyzaldysanchez Aug 8, 2024
ee3beb9
Fixes test
vyzaldysanchez Aug 8, 2024
88cb3ff
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 8, 2024
c822f70
Fixes linter
vyzaldysanchez Aug 8, 2024
ea33928
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 12, 2024
54de5e6
Fixes race condition with `syncer.reader`
vyzaldysanchez Aug 12, 2024
330dd62
Fixes race condition in test
vyzaldysanchez Aug 12, 2024
4c452e4
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 13, 2024
11d8a8b
Fixes race condition in test
vyzaldysanchez Aug 13, 2024
1d65a23
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 13, 2024
8d26d02
Merge branch 'develop' into task/KS-297/db-syncer
bolekk Aug 14, 2024
5a6557e
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 14, 2024
ef5f3c7
Merge branch 'develop' into task/KS-297/db-syncer
bolekk Aug 14, 2024
dca3aec
Merge branch 'develop' into task/KS-297/db-syncer
bolekk Aug 15, 2024
ce70757
Merge branch 'develop' into task/KS-297/db-syncer
vyzaldysanchez Aug 15, 2024
eee39a2
Fixes test
vyzaldysanchez Aug 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/many-knives-play.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#updated Adds DB syncing for registry syncer
5 changes: 4 additions & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -552,4 +552,7 @@ packages:
interfaces:
Reader:
config:
mockname: "Mock{{ .InterfaceName }}"
mockname: "Mock{{ .InterfaceName }}"
github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer:
interfaces:
ORM:
2 changes: 2 additions & 0 deletions core/capabilities/ccip/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/smartcontractkit/chainlink-common/pkg/loop"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/common"
configsevm "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/configs/evm"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/launcher"
Expand Down Expand Up @@ -119,6 +120,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services
},
relayer,
cfg.ExternalRegistry().Address(),
registrysyncer.NewORM(d.ds, d.lggr),
)
if err != nil {
return nil, fmt.Errorf("could not configure syncer: %w", err)
Expand Down
3 changes: 3 additions & 0 deletions core/capabilities/ccip/launcher/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

it "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/ccip_integration_tests/integrationhelpers"
cctypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"

"github.com/onsi/gomega"
Expand All @@ -30,12 +31,14 @@ func TestIntegration_Launcher(t *testing.T) {
p2pIDs := it.P2pIDsFromInts(arr)
uni.AddCapability(p2pIDs)

db := pgtest.NewSqlxDB(t)
regSyncer, err := registrysyncer.New(lggr,
func() (p2ptypes.PeerID, error) {
return p2pIDs[0], nil
},
uni,
uni.CapReg.Address().String(),
registrysyncer.NewORM(db, lggr),
)
require.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
},
relayer,
registryAddress,
registrysyncer.NewORM(opts.DS, globalLogger),
)
if err != nil {
return nil, fmt.Errorf("could not configure syncer: %w", err)
Expand Down
16 changes: 16 additions & 0 deletions core/services/registrysyncer/local_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ type LocalRegistry struct {
IDsToCapabilities map[string]Capability
}

func NewLocalRegistry(
lggr logger.Logger,
getPeerID func() (p2ptypes.PeerID, error),
IDsToDONs map[DonID]DON,
IDsToNodes map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo,
IDsToCapabilities map[string]Capability,
) LocalRegistry {
return LocalRegistry{
lggr: lggr.Named("LocalRegistry"),
getPeerID: getPeerID,
IDsToDONs: IDsToDONs,
IDsToNodes: IDsToNodes,
IDsToCapabilities: IDsToCapabilities,
}
}

func (l *LocalRegistry) LocalNode(ctx context.Context) (capabilities.Node, error) {
// Load the current nodes PeerWrapper, this gets us the current node's
// PeerID, allowing us to contextualize registry information in terms of DON ownership
Expand Down
142 changes: 142 additions & 0 deletions core/services/registrysyncer/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

167 changes: 167 additions & 0 deletions core/services/registrysyncer/orm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package registrysyncer

import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"math/big"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

type capabilitiesRegistryNodeInfo struct {
NodeOperatorId uint32 `json:"nodeOperatorId"`
ConfigCount uint32 `json:"configCount"`
WorkflowDONId uint32 `json:"workflowDONId"`
Signer p2ptypes.PeerID `json:"signer"`
P2pId p2ptypes.PeerID `json:"p2pId"`
HashedCapabilityIds []p2ptypes.PeerID `json:"hashedCapabilityIds"`
CapabilitiesDONIds []string `json:"capabilitiesDONIds"`
}

func (l *LocalRegistry) MarshalJSON() ([]byte, error) {
idsToNodes := make(map[p2ptypes.PeerID]capabilitiesRegistryNodeInfo)
for k, v := range l.IDsToNodes {
hashedCapabilityIds := make([]p2ptypes.PeerID, len(v.HashedCapabilityIds))
for i, id := range v.HashedCapabilityIds {
hashedCapabilityIds[i] = p2ptypes.PeerID(id[:])
}
capabilitiesDONIds := make([]string, len(v.CapabilitiesDONIds))
for i, id := range v.CapabilitiesDONIds {
capabilitiesDONIds[i] = id.String()
}
idsToNodes[k] = capabilitiesRegistryNodeInfo{
NodeOperatorId: v.NodeOperatorId,
ConfigCount: v.ConfigCount,
WorkflowDONId: v.WorkflowDONId,
Signer: p2ptypes.PeerID(v.Signer[:]),
P2pId: p2ptypes.PeerID(v.P2pId[:]),
HashedCapabilityIds: hashedCapabilityIds,
CapabilitiesDONIds: capabilitiesDONIds,
}
}

b, err := json.Marshal(&struct {
IDsToDONs map[DonID]DON
IDsToNodes map[p2ptypes.PeerID]capabilitiesRegistryNodeInfo
IDsToCapabilities map[string]Capability
}{
IDsToDONs: l.IDsToDONs,
IDsToNodes: idsToNodes,
IDsToCapabilities: l.IDsToCapabilities,
})
if err != nil {
return []byte{}, err
}
return b, nil
}

func (l *LocalRegistry) UnmarshalJSON(data []byte) error {
temp := struct {
IDsToDONs map[DonID]DON
IDsToNodes map[p2ptypes.PeerID]capabilitiesRegistryNodeInfo
IDsToCapabilities map[string]Capability
}{
IDsToDONs: make(map[DonID]DON),
IDsToNodes: make(map[p2ptypes.PeerID]capabilitiesRegistryNodeInfo),
IDsToCapabilities: make(map[string]Capability),
}

if err := json.Unmarshal(data, &temp); err != nil {
return fmt.Errorf("failed to unmarshal state: %w", err)
}

l.IDsToDONs = temp.IDsToDONs

l.IDsToNodes = make(map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo)
for peerID, v := range temp.IDsToNodes {
hashedCapabilityIds := make([][32]byte, len(v.HashedCapabilityIds))
for i, id := range v.HashedCapabilityIds {
copy(hashedCapabilityIds[i][:], id[:])
}

capabilitiesDONIds := make([]*big.Int, len(v.CapabilitiesDONIds))
for i, id := range v.CapabilitiesDONIds {
bigInt := new(big.Int)
bigInt.SetString(id, 10)
capabilitiesDONIds[i] = bigInt
}
l.IDsToNodes[peerID] = kcr.CapabilitiesRegistryNodeInfo{
NodeOperatorId: v.NodeOperatorId,
ConfigCount: v.ConfigCount,
WorkflowDONId: v.WorkflowDONId,
Signer: v.Signer,
P2pId: v.P2pId,
HashedCapabilityIds: hashedCapabilityIds,
CapabilitiesDONIds: capabilitiesDONIds,
}
}

l.IDsToCapabilities = temp.IDsToCapabilities

return nil
}

type ORM interface {
AddLocalRegistry(ctx context.Context, localRegistry LocalRegistry) error
LatestLocalRegistry(ctx context.Context) (*LocalRegistry, error)
}

type orm struct {
ds sqlutil.DataSource
lggr logger.Logger
}

var _ ORM = (*orm)(nil)

func NewORM(ds sqlutil.DataSource, lggr logger.Logger) orm {
namedLogger := lggr.Named("RegistrySyncerORM")
return orm{
ds: ds,
lggr: namedLogger,
}
}

func (orm orm) AddLocalRegistry(ctx context.Context, localRegistry LocalRegistry) error {
return sqlutil.TransactDataSource(ctx, orm.ds, nil, func(tx sqlutil.DataSource) error {
localRegistryJSON, err := localRegistry.MarshalJSON()
if err != nil {
return err
}
hash := sha256.Sum256(localRegistryJSON)
_, err = tx.ExecContext(
ctx,
`INSERT INTO registry_syncer_states (data, data_hash) VALUES ($1, $2) ON CONFLICT (data_hash) DO NOTHING`,
localRegistryJSON, fmt.Sprintf("%x", hash[:]),
)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, `DELETE FROM registry_syncer_states
WHERE data_hash NOT IN (
SELECT data_hash FROM registry_syncer_states
ORDER BY id DESC
LIMIT 10
);`)
return err
})
}

func (orm orm) LatestLocalRegistry(ctx context.Context) (*LocalRegistry, error) {
var localRegistry LocalRegistry
var localRegistryJSON string
err := orm.ds.GetContext(ctx, &localRegistryJSON, `SELECT data FROM registry_syncer_states ORDER BY id DESC LIMIT 1`)
if err != nil {
return nil, err
}
err = localRegistry.UnmarshalJSON([]byte(localRegistryJSON))
if err != nil {
return nil, err
}
return &localRegistry, nil
}
Loading
Loading