-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Log Event Trigger Capability Development: Part 3 (#14637)
* Use default mode aggregator for all non-streams triggers * F+1 idential responses * Test for remote trigger subscriber shim * Updated comments * Interface needed for the test
- Loading branch information
1 parent
c654322
commit 3738ee4
Showing
3 changed files
with
258 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,12 +15,16 @@ import ( | |
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" | ||
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" | ||
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests" | ||
"github.com/smartcontractkit/chainlink-common/pkg/values" | ||
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" | ||
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" | ||
remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" | ||
kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry" | ||
"github.com/smartcontractkit/chainlink/v2/core/logger" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" | ||
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/p2p/types/mocks" | ||
"github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" | ||
|
@@ -184,6 +188,234 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) { | |
defer launcher.Close() | ||
} | ||
|
||
func newTriggerEventMsg(t *testing.T, | ||
senderPeerID types.PeerID, | ||
workflowID string, | ||
triggerEvent map[string]any, | ||
triggerEventID string) (*remotetypes.MessageBody, *values.Map) { | ||
triggerEventValue, err := values.NewMap(triggerEvent) | ||
require.NoError(t, err) | ||
capResponse := capabilities.TriggerResponse{ | ||
Event: capabilities.TriggerEvent{ | ||
Outputs: triggerEventValue, | ||
ID: triggerEventID, | ||
}, | ||
Err: nil, | ||
} | ||
marshaled, err := pb.MarshalTriggerResponse(capResponse) | ||
require.NoError(t, err) | ||
return &remotetypes.MessageBody{ | ||
Sender: senderPeerID[:], | ||
Method: remotetypes.MethodTriggerEvent, | ||
Metadata: &remotetypes.MessageBody_TriggerEventMetadata{ | ||
TriggerEventMetadata: &remotetypes.TriggerEventMetadata{ | ||
WorkflowIds: []string{workflowID}, | ||
}, | ||
}, | ||
Payload: marshaled, | ||
}, triggerEventValue | ||
} | ||
|
||
func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) { | ||
ctx := tests.Context(t) | ||
lggr := logger.TestLogger(t) | ||
registry := NewRegistry(lggr) | ||
dispatcher := remoteMocks.NewDispatcher(t) | ||
|
||
var pid ragetypes.PeerID | ||
err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) | ||
require.NoError(t, err) | ||
peer := mocks.NewPeer(t) | ||
peer.On("UpdateConnections", mock.Anything).Return(nil) | ||
peer.On("ID").Return(pid) | ||
wrapper := mocks.NewPeerWrapper(t) | ||
wrapper.On("GetPeer").Return(peer) | ||
|
||
workflowDonNodes := []ragetypes.PeerID{ | ||
pid, | ||
randomWord(), | ||
randomWord(), | ||
randomWord(), | ||
} | ||
|
||
capabilityDonNodes := []ragetypes.PeerID{ | ||
randomWord(), | ||
randomWord(), | ||
randomWord(), | ||
randomWord(), | ||
} | ||
|
||
fullTriggerCapID := "[email protected]" | ||
fullTargetID := "[email protected]" | ||
triggerCapID := randomWord() | ||
targetCapID := randomWord() | ||
dID := uint32(1) | ||
capDonID := uint32(2) | ||
// The below state describes a Workflow DON (AcceptsWorkflows = true), | ||
// which exposes the log-event-trigger and write_chain capabilities. | ||
// We expect receivers to be wired up and both capabilities to be added to the registry. | ||
rtc := &capabilities.RemoteTriggerConfig{} | ||
rtc.ApplyDefaults() | ||
|
||
cfg, err := proto.Marshal(&capabilitiespb.CapabilityConfig{ | ||
RemoteConfig: &capabilitiespb.CapabilityConfig_RemoteTriggerConfig{ | ||
RemoteTriggerConfig: &capabilitiespb.RemoteTriggerConfig{ | ||
RegistrationRefresh: durationpb.New(1 * time.Second), | ||
MinResponsesToAggregate: 3, | ||
}, | ||
}, | ||
}) | ||
require.NoError(t, err) | ||
|
||
state := ®istrysyncer.LocalRegistry{ | ||
IDsToDONs: map[registrysyncer.DonID]registrysyncer.DON{ | ||
registrysyncer.DonID(dID): { | ||
DON: capabilities.DON{ | ||
ID: dID, | ||
ConfigVersion: uint32(0), | ||
F: uint8(1), | ||
IsPublic: true, | ||
AcceptsWorkflows: true, | ||
Members: workflowDonNodes, | ||
}, | ||
}, | ||
registrysyncer.DonID(capDonID): { | ||
DON: capabilities.DON{ | ||
ID: capDonID, | ||
ConfigVersion: uint32(0), | ||
F: uint8(1), | ||
IsPublic: true, | ||
AcceptsWorkflows: false, | ||
Members: capabilityDonNodes, | ||
}, | ||
CapabilityConfigurations: map[string]registrysyncer.CapabilityConfiguration{ | ||
fullTriggerCapID: { | ||
Config: cfg, | ||
}, | ||
fullTargetID: { | ||
Config: cfg, | ||
}, | ||
}, | ||
}, | ||
}, | ||
IDsToCapabilities: map[string]registrysyncer.Capability{ | ||
fullTriggerCapID: { | ||
ID: fullTriggerCapID, | ||
CapabilityType: capabilities.CapabilityTypeTrigger, | ||
}, | ||
fullTargetID: { | ||
ID: fullTargetID, | ||
CapabilityType: capabilities.CapabilityTypeTarget, | ||
}, | ||
}, | ||
IDsToNodes: map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{ | ||
capabilityDonNodes[0]: { | ||
NodeOperatorId: 1, | ||
Signer: randomWord(), | ||
P2pId: capabilityDonNodes[0], | ||
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, | ||
}, | ||
capabilityDonNodes[1]: { | ||
NodeOperatorId: 1, | ||
Signer: randomWord(), | ||
P2pId: capabilityDonNodes[1], | ||
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, | ||
}, | ||
capabilityDonNodes[2]: { | ||
NodeOperatorId: 1, | ||
Signer: randomWord(), | ||
P2pId: capabilityDonNodes[2], | ||
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, | ||
}, | ||
capabilityDonNodes[3]: { | ||
NodeOperatorId: 1, | ||
Signer: randomWord(), | ||
P2pId: capabilityDonNodes[3], | ||
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, | ||
}, | ||
workflowDonNodes[0]: { | ||
NodeOperatorId: 1, | ||
Signer: randomWord(), | ||
P2pId: workflowDonNodes[0], | ||
}, | ||
workflowDonNodes[1]: { | ||
NodeOperatorId: 1, | ||
Signer: randomWord(), | ||
P2pId: workflowDonNodes[1], | ||
}, | ||
workflowDonNodes[2]: { | ||
NodeOperatorId: 1, | ||
Signer: randomWord(), | ||
P2pId: workflowDonNodes[2], | ||
}, | ||
workflowDonNodes[3]: { | ||
NodeOperatorId: 1, | ||
Signer: randomWord(), | ||
P2pId: workflowDonNodes[3], | ||
}, | ||
}, | ||
} | ||
|
||
launcher := NewLauncher( | ||
lggr, | ||
wrapper, | ||
dispatcher, | ||
registry, | ||
) | ||
|
||
dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) | ||
dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*target.client")).Return(nil) | ||
awaitRegistrationMessageCh := make(chan struct{}) | ||
dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { | ||
select { | ||
case awaitRegistrationMessageCh <- struct{}{}: | ||
default: | ||
} | ||
}) | ||
|
||
err = launcher.Launch(ctx, state) | ||
require.NoError(t, err) | ||
defer launcher.Close() | ||
|
||
baseCapability, err := registry.Get(ctx, fullTriggerCapID) | ||
require.NoError(t, err) | ||
|
||
remoteTriggerSubscriber, ok := baseCapability.(remote.TriggerSubscriber) | ||
require.True(t, ok, "remote trigger capability") | ||
|
||
// Register trigger | ||
workflowID1 := "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0" | ||
workflowExecutionID1 := "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed" | ||
req := capabilities.TriggerRegistrationRequest{ | ||
TriggerID: "logeventtrigger_log1", | ||
Metadata: capabilities.RequestMetadata{ | ||
ReferenceID: "logeventtrigger", | ||
WorkflowID: workflowID1, | ||
WorkflowExecutionID: workflowExecutionID1, | ||
}, | ||
} | ||
triggerEventCallbackCh, err := remoteTriggerSubscriber.RegisterTrigger(ctx, req) | ||
require.NoError(t, err) | ||
<-awaitRegistrationMessageCh | ||
|
||
// Receive trigger event | ||
triggerEvent1 := map[string]any{"event": "triggerEvent1"} | ||
triggerEvent2 := map[string]any{"event": "triggerEvent2"} | ||
triggerEventMsg1, triggerEventValue := newTriggerEventMsg(t, capabilityDonNodes[0], workflowID1, triggerEvent1, "TriggerEventID1") | ||
triggerEventMsg2, _ := newTriggerEventMsg(t, capabilityDonNodes[1], workflowID1, triggerEvent1, "TriggerEventID1") | ||
// One Faulty Node (F = 1) sending bad event data for the same TriggerEventID1 | ||
triggerEventMsg3, _ := newTriggerEventMsg(t, capabilityDonNodes[2], workflowID1, triggerEvent2, "TriggerEventID1") | ||
remoteTriggerSubscriber.Receive(ctx, triggerEventMsg1) | ||
remoteTriggerSubscriber.Receive(ctx, triggerEventMsg2) | ||
remoteTriggerSubscriber.Receive(ctx, triggerEventMsg3) | ||
|
||
// After MinResponsesToAggregate, we should get a response | ||
response := <-triggerEventCallbackCh | ||
|
||
// Checks if response is same as minIdenticalResponses = F + 1, F = 1 | ||
require.Equal(t, response.Event.Outputs, triggerEventValue) | ||
} | ||
|
||
func TestSyncer_IgnoresCapabilitiesForPrivateDON(t *testing.T) { | ||
ctx := tests.Context(t) | ||
lggr := logger.TestLogger(t) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters