Skip to content

Commit

Permalink
Notifications support (#52)
Browse files Browse the repository at this point in the history
implementation of netconf notifications as per proposal in
#51

I've added a couple of TODOs that we may or may not want to address.

Filter parameter implementation may be out of scope and deserve its own
PR, what plans have you for it?

About checks on capabilities I'm not sure you actually want to address
them here, at least I don't see other places where you take capabilities
into account.

---------

Co-authored-by: GCortesi <[email protected]>
  • Loading branch information
GiacomoCortesi and GCortesi authored Jun 12, 2023
1 parent beceee4 commit 8ca892b
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 11 deletions.
43 changes: 43 additions & 0 deletions ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,46 @@ func (s *Session) CancelCommit(ctx context.Context, opts ...CancelCommitOption)
var resp OKResp
return s.Call(ctx, &req, &resp)
}

// CreateSubscriptionOption is a optional arguments to [Session.CreateSubscription] method
type CreateSubscriptionOption interface {
apply(req *createSubscriptionReq)
}

type createSubscriptionReq struct {
XMLName xml.Name `xml:"urn:ietf:params:xml:ns:netconf:notification:1.0 create-subscription"`
Stream string `xml:"stream,omitempty"`
// TODO: Implement filter
//Filter int64 `xml:"filter,omitempty"`
StartTime string `xml:"startTime,omitempty"`
EndTime string `xml:"endTime,omitempty"`
}

type stream string
type startTime time.Time
type endTime time.Time

func (o stream) apply(req *createSubscriptionReq) {
req.Stream = string(o)
}
func (o startTime) apply(req *createSubscriptionReq) {
req.StartTime = time.Time(o).Format(time.RFC3339)
}
func (o endTime) apply(req *createSubscriptionReq) {
req.EndTime = time.Time(o).Format(time.RFC3339)
}

func WithStreamOption(s string) CreateSubscriptionOption { return stream(s) }
func WithStartTimeOption(st time.Time) CreateSubscriptionOption { return startTime(st) }
func WithEndTimeOption(et time.Time) CreateSubscriptionOption { return endTime(et) }

func (s *Session) CreateSubscription(ctx context.Context, opts ...CreateSubscriptionOption) error {
var req createSubscriptionReq
for _, opt := range opts {
opt.apply(&req)
}
// TODO: eventual custom notifications rpc logic, e.g. create subscription only if notification capability is present

var resp OKResp
return s.Call(ctx, &req, &resp)
}
70 changes: 70 additions & 0 deletions ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,3 +537,73 @@ func TestCancelCommit(t *testing.T) {
})
}
}

func TestCreateSubscription(t *testing.T) {
st, err := time.Parse(time.RFC3339, "2023-06-07T18:31:48+02:00")
if err != nil {
t.Fatalf("invalid startTime: %s", st)
}
et, err := time.Parse(time.RFC3339, "2023-06-07T18:33:48+02:00")
if err != nil {
t.Fatalf("invalid endTime: %s", et)
}
tt := []struct {
name string
options []CreateSubscriptionOption
matches []*regexp.Regexp
}{
{
name: "noOptions",
matches: []*regexp.Regexp{
regexp.MustCompile(`<create-subscription></create-subscription>`),
},
},
{
name: "startTime option",
options: []CreateSubscriptionOption{WithStartTimeOption(st)},
matches: []*regexp.Regexp{
regexp.MustCompile(`<create-subscription><startTime>` + regexp.QuoteMeta(st.Format(time.RFC3339)) + `</startTime></create-subscription>`),
},
},
{
name: "endTime option",
options: []CreateSubscriptionOption{WithEndTimeOption(et)},
matches: []*regexp.Regexp{
regexp.MustCompile(`<create-subscription><endTime>` + regexp.QuoteMeta(et.Format(time.RFC3339)) + `</endTime></create-subscription>`),
},
},
{
name: "stream option",
options: []CreateSubscriptionOption{WithStreamOption("thestream")},
matches: []*regexp.Regexp{
regexp.MustCompile(`<create-subscription><stream>thestream</stream></create-subscription>`),
},
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
ts := newTestServer(t)
sess := newSession(ts.transport())
go sess.recv()

ts.queueRespString(`<rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0" message-id="1"><ok/></rpc-reply>`)

err := sess.CreateSubscription(context.Background(), tc.options...)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

sentMsg, err := ts.popReq()
if err != nil {
t.Errorf("failed to read message sent to sever: %v", err)
}

for _, match := range tc.matches {
if !match.Match(sentMsg) {
t.Errorf("sent message didn't match `%s`", match.String())
}
}
})
}
}
54 changes: 43 additions & 11 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
var ErrClosed = errors.New("closed connection")

type sessionConfig struct {
capabilities []string
capabilities []string
notificationHandler NotificationHandler
}

type SessionOption interface {
Expand All @@ -36,20 +37,44 @@ func WithCapability(capabilities ...string) SessionOption {
return capabilityOpt(capabilities)
}

type notificationHandlerOpt NotificationHandler

func (o notificationHandlerOpt) apply(cfg *sessionConfig) {
cfg.notificationHandler = NotificationHandler(o)
}

func WithNotificationHandler(nh NotificationHandler) SessionOption {
return notificationHandlerOpt(nh)
}

// Session is represents a netconf session to a one given device.
type Session struct {
tr transport.Transport
sessionID uint64

clientCaps capabilitySet
serverCaps capabilitySet
clientCaps capabilitySet
serverCaps capabilitySet
notificationHandler NotificationHandler

mu sync.Mutex
seq uint64
reqs map[uint64]*req
closing bool
}

// NotificationHandler function allows to work with received notifications.
// A NotificationHandler function can be passed in as an option when calling Open method of Session object
// A typical use of the NofificationHandler function is to retrieve notifications once they are received so
// that they can be parsed and/or stored somewhere.
// Sample usage:
// func GetNotificationsHandler(c chan string) netconf.NotificationsHandler {
// return func(nm NotificationMsg) {
// // just send the raw notification data to the channel
// c <- nm
// }
//}
type NotificationHandler func(msg NotificationMsg)

func newSession(transport transport.Transport, opts ...SessionOption) *Session {
cfg := sessionConfig{
capabilities: DefaultCapabilities,
Expand All @@ -60,9 +85,10 @@ func newSession(transport transport.Transport, opts ...SessionOption) *Session {
}

s := &Session{
tr: transport,
clientCaps: newCapabilitySet(cfg.capabilities...),
reqs: make(map[uint64]*req),
tr: transport,
clientCaps: newCapabilitySet(cfg.capabilities...),
reqs: make(map[uint64]*req),
notificationHandler: cfg.notificationHandler,
}
return s
}
Expand Down Expand Up @@ -176,16 +202,21 @@ func (s *Session) recvMsg() error {
return err
}

const ncNamespace = "urn:ietf:params:xml:ns:netconf:base:1.0"
const (
ncNamespace = "urn:ietf:params:xml:ns:netconf:base:1.0"
notifNamespace = "urn:ietf:params:xml:ns:netconf:notification:1.0"
)

switch root.Name {
/* Not supported yet. Will implement post beta release
case "notification":
case xml.Name{Space: notifNamespace, Local: "notification"}:
if s.notificationHandler == nil {
return nil
}
var notif NotificationMsg
if err := dec.DecodeElement(&notif, root); err != nil {
log.Printf("failed to decode notification message: %v", err)
return fmt.Errorf("failed to decode notification message: %w", err)
}
*/
s.notificationHandler(notif)
case xml.Name{Space: ncNamespace, Local: "rpc-reply"}:
var reply RPCReplyMsg
if err := dec.DecodeElement(&reply, root); err != nil {
Expand All @@ -206,6 +237,7 @@ func (s *Session) recvMsg() error {
default:
return fmt.Errorf("unknown message type: %q", root.Name.Local)
}
return nil
}

// recv is the main receive loop. It runs concurrently to be able to handle
Expand Down

0 comments on commit 8ca892b

Please sign in to comment.