-
Notifications
You must be signed in to change notification settings - Fork 0
/
observe.go
107 lines (86 loc) · 2.44 KB
/
observe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
package coap
import (
"sync"
)
var observeMap sync.Map
type ObserveCallback func(req *Message, arg interface{}) error
type ObserveNotFoundCallback func(req *Message) bool
type Observation struct {
path string
callback ObserveCallback
arg interface{}
}
func (s *Server) Observe(addr string, code COAPCode, path string, payload []byte, encoding MediaType, callback ObserveCallback, arg interface{}, options *SendOptions) (string, error) {
if options == nil {
options = s.NewOptions()
}
req := &Message{Type: TypeConfirmable, Code: code}
req.WithOption(OptObserve, 0, true)
if len(path) != 0 {
req.WithPathString(path)
}
if encoding != None {
req.WithOption(OptAccept, encoding, true)
}
if payload != nil {
req.WithOption(OptContentFormat, encoding, true)
req.Payload = payload
}
rsp, err := s.Send(addr, req, options)
if err != nil {
return "", err
}
err = RspCodeToError(rsp.Code)
if err != nil {
return "", err
}
observeMap.Store(string(req.Token), &Observation{path: path, callback: callback, arg: arg})
_ = callback(rsp, arg)
return string(req.Token), nil
}
func (s *Server) ObserveCancel(addr string, path string, token string, options *SendOptions) error {
if options == nil {
options = s.NewOptions()
}
req := &Message{Type: TypeConfirmable, Code: CodeGet}
req.WithOption(OptObserve, 1, true)
req.WithPathString(path)
req.Token = []byte(token)
observeMap.Delete(token)
rsp, err := s.Send(addr, req, options)
if err != nil {
return err
}
err = RspCodeToError(rsp.Code)
if err != nil {
return err
}
return nil
}
func ObserveRegister(token string, path string, callback ObserveCallback, arg interface{}) {
observeMap.Store(token, &Observation{path: path, callback: callback, arg: arg})
return
}
func ObserveTokens(callback func(string)) {
observeMap.Range(func(key interface{}, value interface{}) bool {
callback(key.(string))
return true
})
}
func (s *Server) getObserve(msg *Message) *Observation {
c, found := observeMap.Load(string(msg.Token))
if found {
return c.(*Observation)
} else {
if s.config.ObserveNotFoundCallback != nil && s.config.ObserveNotFoundCallback(msg) {
c, found = observeMap.Load(string(msg.Token))
if found {
return c.(*Observation)
}
}
}
return nil
}