diff --git a/command.go b/command.go index e4399d2e801..75ec921f5f3 100644 --- a/command.go +++ b/command.go @@ -19,6 +19,7 @@ type Command interface { Type() string GetValue() string GetKey() string + Sensitive() bool } // Set command @@ -38,22 +39,26 @@ func (c *SetCommand) Apply(server *raft.Server) ([]byte, error) { return json.Marshal(res) } -func (c *SetCommand) GeneratePath() string{ +func (c *SetCommand) GeneratePath() string { return "/set/" + c.Key } -func (c *SetCommand) Type() string{ +func (c *SetCommand) Type() string { return "POST" } -func (c *SetCommand) GetValue() string{ +func (c *SetCommand) GetValue() string { return c.Value } -func (c *SetCommand) GetKey() string{ +func (c *SetCommand) GetKey() string { return c.Key } +func (c *SetCommand) Sensitive() bool { + return true +} + // Get command type GetCommand struct { @@ -87,6 +92,9 @@ func (c *GetCommand) GetKey() string{ return c.Key } +func (c *GetCommand) Sensitive() bool { + return false +} // Delete command type DeleteCommand struct { @@ -98,7 +106,7 @@ func (c *DeleteCommand) CommandName() string { return "delete" } -// Set the value of key to value +// Delete the key func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error){ res := s.Delete(c.Key) return json.Marshal(res) @@ -120,7 +128,52 @@ func (c *DeleteCommand) GetKey() string{ return c.Key } -// joinCommand +func (c *DeleteCommand) Sensitive() bool { + return true +} + + +// Watch command +type WatchCommand struct { + Key string `json:"key"` +} + +//The name of the command in the log +func (c *WatchCommand) CommandName() string { + return "watch" +} + +func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error){ + ch := make(chan Response) + + w.add(c.Key, ch) + + res := <- ch + + return json.Marshal(res) +} + +func (c *WatchCommand) GeneratePath() string{ + return "/watch/" + c.Key +} + +func (c *WatchCommand) Type() string{ + return "GET" +} + +func (c *WatchCommand) GetValue() string{ + return "" +} + +func (c *WatchCommand) GetKey() string{ + return c.Key +} + +func (c *WatchCommand) Sensitive() bool { + return false +} + +// JoinCommand type JoinCommand struct { Name string `json:"name"` } diff --git a/handlers.go b/handlers.go index 2eadb41c252..23958f5860c 100644 --- a/handlers.go +++ b/handlers.go @@ -132,6 +132,18 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) { } +func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + + debug("[recv] GET http://%v/watch/%s", server.Name(), vars["key"]) + + command := &WatchCommand{} + command.Key = vars["key"] + + Dispatch(server, command, w) + +} + func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) { var body []byte var err error @@ -142,15 +154,29 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) { for { // i am the leader, i will take care of the command if server.State() == "leader" { - fmt.Println("i am leader ", server.Name()) - if body, err = server.Do(command); err != nil { - warn("raftd: Unable to write file: %v", err) - w.WriteHeader(http.StatusInternalServerError) + if command.Sensitive() { + if body, err = server.Do(command); err != nil { + warn("raftd: Unable to write file: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } else { + // good to go + w.WriteHeader(http.StatusOK) + w.Write(body) + return + } } else { + fmt.Println("non-sensitive") + if body, err = command.Apply(server); err != nil { + warn("raftd: Unable to write file: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } else { // good to go - w.WriteHeader(http.StatusOK) - w.Write(body) - return + w.WriteHeader(http.StatusOK) + w.Write(body) + return + } } // redirect the command to the current leader diff --git a/raftd.go b/raftd.go index c142b1b57b8..08b3b7e13c7 100644 --- a/raftd.go +++ b/raftd.go @@ -138,6 +138,7 @@ func main() { r.HandleFunc("/set/{key}", SetHttpHandler).Methods("POST") r.HandleFunc("/get/{key}", GetHttpHandler).Methods("GET") r.HandleFunc("/delete/{key}", DeleteHttpHandler).Methods("GET") + r.HandleFunc("/watch/{key}", WatchHttpHandler).Methods("GET") http.Handle("/", r) log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil)) diff --git a/store.go b/store.go index a973376fc02..8e8a7823011 100644 --- a/store.go +++ b/store.go @@ -3,13 +3,14 @@ package main import ( "path" "encoding/json" - "fmt" + //"fmt" ) // CONSTANTS const ( - ERROR = -(1 + iota) + ERROR = -1 + iota SET + GET DELETE ) @@ -18,7 +19,10 @@ type Store struct { } type Response struct { - OldValue string `json:oldvalue` + Action int `json:action` + Key string `json:key` + OldValue string `json:oldValue` + NewValue string `json:newValue` Exist bool `json:exist` } @@ -39,34 +43,32 @@ func createStore() *Store{ // set the key to value, return the old value if the key exists func (s *Store) Set(key string, value string) Response { - fmt.Println("Store SET") key = path.Clean(key) oldValue, ok := s.Nodes[key] if ok { s.Nodes[key] = value - w.notify(SET, key, oldValue, value) - return Response{oldValue, true} + w.notify(SET, key, oldValue, value, true) + return Response{SET, key, oldValue, value, true} } else { s.Nodes[key] = value - w.notify(SET, key, "", value) - return Response{"", false} + w.notify(SET, key, "", value, false) + return Response{SET, key, "", value, false} } } // get the value of the key func (s *Store) Get(key string) Response { - fmt.Println("Stroe Get") key = path.Clean(key) value, ok := s.Nodes[key] if ok { - return Response{value, true} + return Response{GET, key, value, value, true} } else { - return Response{"", false} + return Response{GET, key, "", value, false} } } @@ -79,11 +81,11 @@ func (s *Store) Delete(key string) Response { if ok { delete(s.Nodes, key) - w.notify(DELETE, key, oldValue, "") + w.notify(DELETE, key, oldValue, "", true) - return Response{oldValue, true} + return Response{DELETE, key, oldValue, "", true} } else { - return Response{"", false} + return Response{DELETE, key, "", "", false} } } diff --git a/watcher.go b/watcher.go index 168f1045cd9..efe90acfec0 100644 --- a/watcher.go +++ b/watcher.go @@ -8,14 +8,7 @@ import ( type Watcher struct { - chanMap map[string][]chan Notification -} - -type Notification struct { - action int - key string - oldValue string - newValue string + chanMap map[string][]chan Response } // global watcher @@ -29,19 +22,19 @@ func init() { // create a new watcher func createWatcher() *Watcher { w := new(Watcher) - w.chanMap = make(map[string][]chan Notification) + w.chanMap = make(map[string][]chan Response) return w } // register a function with channel and prefix to the watcher -func (w *Watcher) add(prefix string, c chan Notification, f func(chan Notification)) error { +func (w *Watcher) add(prefix string, c chan Response) error { - prefix = path.Clean(prefix) + prefix = "/" + path.Clean(prefix) fmt.Println("Add ", prefix) _, ok := w.chanMap[prefix] if !ok { - w.chanMap[prefix] = make([]chan Notification, 0) + w.chanMap[prefix] = make([]chan Response, 0) w.chanMap[prefix] = append(w.chanMap[prefix], c) } else { w.chanMap[prefix] = append(w.chanMap[prefix], c) @@ -49,14 +42,13 @@ func (w *Watcher) add(prefix string, c chan Notification, f func(chan Notificati fmt.Println(len(w.chanMap[prefix]), "@", prefix) - go f(c) return nil } // notify the watcher a action happened -func (w *Watcher) notify(action int, key string, oldValue string, newValue string) error { +func (w *Watcher) notify(action int, key string, oldValue string, newValue string, exist bool) error { key = path.Clean(key) - + fmt.Println("notify") segments := strings.Split(key, "/") currPath := "/" @@ -73,7 +65,7 @@ func (w *Watcher) notify(action int, key string, oldValue string, newValue strin if ok { fmt.Println("found ", currPath) - n := Notification {action, key, oldValue, newValue} + n := Response {action, key, oldValue, newValue, exist} // notify all the watchers for _, c := range chans { c <- n diff --git a/watcher_test.go b/watcher_test.go index ab8003902b1..f7197eb9ccc 100644 --- a/watcher_test.go +++ b/watcher_test.go @@ -9,8 +9,10 @@ func TestWatch(t *testing.T) { // watcher := createWatcher() c := make(chan Notification) d := make(chan Notification) - w.add("/", c, say) - w.add("/prefix/", d, say) + w.add("/", c) + go say(c) + w.add("/prefix/", d) + go say(d) s.Set("/prefix/foo", "bar") }