-
Notifications
You must be signed in to change notification settings - Fork 186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds Application Specific RPC Inspector #509
Changes from 25 commits
c93e47a
19b4efd
7c7c05a
c557ad4
5f71a48
1c1c946
ab8365c
5e7a682
b897e22
1c91995
60457b3
6f0858c
3d47fa1
a94ddf3
26f8d34
2e13ee8
1c99052
352d747
40950fe
586c5cb
221ca50
653294a
37df80c
7f56708
e23f9ea
776d859
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -170,6 +170,11 @@ type PubSub struct { | |
protoMatchFunc ProtocolMatchFn | ||
|
||
ctx context.Context | ||
|
||
// appSpecificRpcInspector is an auxiliary that may be set by the application to inspect incoming RPCs prior to | ||
// processing them. The inspector is invoked on an accepted RPC right prior to handling it. | ||
// The return value of the inspector function is a boolean indicating whether the RPC should be processed or not. | ||
appSpecificRpcInspector func(peer.ID, *RPC) bool | ||
} | ||
|
||
// PubSubRouter is the message router component of PubSub. | ||
|
@@ -527,6 +532,13 @@ func WithSeenMessagesTTL(ttl time.Duration) Option { | |
} | ||
} | ||
|
||
func WithAppSpecificRpcInspector(inspector func(peer.ID, *RPC) bool) Option { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, lets make a type alias for the user inspector fun, returning error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
return func(ps *PubSub) error { | ||
ps.appSpecificRpcInspector = inspector | ||
return nil | ||
} | ||
} | ||
|
||
// processLoop handles all inputs arriving on the channels | ||
func (p *PubSub) processLoop(ctx context.Context) { | ||
defer func() { | ||
|
@@ -1005,6 +1017,14 @@ func (p *PubSub) notifyLeave(topic string, pid peer.ID) { | |
} | ||
|
||
func (p *PubSub) handleIncomingRPC(rpc *RPC) { | ||
// pass the rpc through app specific validation (if any available). | ||
if p.appSpecificRpcInspector != nil { | ||
// check if the RPC is allowed by the external inspector | ||
if accept := p.appSpecificRpcInspector(rpc.from, rpc); !accept { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
return // reject the RPC | ||
} | ||
} | ||
|
||
p.tracer.RecvRPC(rpc) | ||
|
||
subs := rpc.GetSubscriptions() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets return a loggable error to provide some context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
776d859