Skip to content

Commit

Permalink
add watch function and distinguish sensetive and non-sentive command
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Jun 10, 2013
1 parent b9d789f commit 2e679d2
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 45 deletions.
65 changes: 59 additions & 6 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Command interface {
Type() string
GetValue() string
GetKey() string
Sensitive() bool
}

// Set command
Expand All @@ -38,22 +39,26 @@ func (c *SetCommand) Apply(server *raft.Server) ([]byte, error) {
return json.Marshal(res)
}

func (c *SetCommand) GeneratePath() string{
func (c *SetCommand) GeneratePath() string {
return "/set/" + c.Key
}

func (c *SetCommand) Type() string{
func (c *SetCommand) Type() string {
return "POST"
}

func (c *SetCommand) GetValue() string{
func (c *SetCommand) GetValue() string {
return c.Value
}

func (c *SetCommand) GetKey() string{
func (c *SetCommand) GetKey() string {
return c.Key
}

func (c *SetCommand) Sensitive() bool {
return true
}


// Get command
type GetCommand struct {
Expand Down Expand Up @@ -87,6 +92,9 @@ func (c *GetCommand) GetKey() string{
return c.Key
}

func (c *GetCommand) Sensitive() bool {
return false
}

// Delete command
type DeleteCommand struct {
Expand All @@ -98,7 +106,7 @@ func (c *DeleteCommand) CommandName() string {
return "delete"
}

// Set the value of key to value
// Delete the key
func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error){
res := s.Delete(c.Key)
return json.Marshal(res)
Expand All @@ -120,7 +128,52 @@ func (c *DeleteCommand) GetKey() string{
return c.Key
}

// joinCommand
func (c *DeleteCommand) Sensitive() bool {
return true
}


// Watch command
type WatchCommand struct {
Key string `json:"key"`
}

//The name of the command in the log
func (c *WatchCommand) CommandName() string {
return "watch"
}

func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error){
ch := make(chan Response)

w.add(c.Key, ch)

res := <- ch

return json.Marshal(res)
}

func (c *WatchCommand) GeneratePath() string{
return "/watch/" + c.Key
}

func (c *WatchCommand) Type() string{
return "GET"
}

func (c *WatchCommand) GetValue() string{
return ""
}

func (c *WatchCommand) GetKey() string{
return c.Key
}

func (c *WatchCommand) Sensitive() bool {
return false
}

// JoinCommand
type JoinCommand struct {
Name string `json:"name"`
}
Expand Down
40 changes: 33 additions & 7 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) {
}


func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)

debug("[recv] GET http://%v/watch/%s", server.Name(), vars["key"])

command := &WatchCommand{}
command.Key = vars["key"]

Dispatch(server, command, w)

}

func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
var body []byte
var err error
Expand All @@ -142,15 +154,29 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
for {
// i am the leader, i will take care of the command
if server.State() == "leader" {
fmt.Println("i am leader ", server.Name())
if body, err = server.Do(command); err != nil {
warn("raftd: Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
if command.Sensitive() {
if body, err = server.Do(command); err != nil {
warn("raftd: Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
} else {
// good to go
w.WriteHeader(http.StatusOK)
w.Write(body)
return
}
} else {
fmt.Println("non-sensitive")
if body, err = command.Apply(server); err != nil {
warn("raftd: Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
} else {
// good to go
w.WriteHeader(http.StatusOK)
w.Write(body)
return
w.WriteHeader(http.StatusOK)
w.Write(body)
return
}
}

// redirect the command to the current leader
Expand Down
1 change: 1 addition & 0 deletions raftd.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func main() {
r.HandleFunc("/set/{key}", SetHttpHandler).Methods("POST")
r.HandleFunc("/get/{key}", GetHttpHandler).Methods("GET")
r.HandleFunc("/delete/{key}", DeleteHttpHandler).Methods("GET")
r.HandleFunc("/watch/{key}", WatchHttpHandler).Methods("GET")

http.Handle("/", r)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil))
Expand Down
30 changes: 16 additions & 14 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package main
import (
"path"
"encoding/json"
"fmt"
//"fmt"
)

// CONSTANTS
const (
ERROR = -(1 + iota)
ERROR = -1 + iota
SET
GET
DELETE
)

Expand All @@ -18,7 +19,10 @@ type Store struct {
}

type Response struct {
OldValue string `json:oldvalue`
Action int `json:action`
Key string `json:key`
OldValue string `json:oldValue`
NewValue string `json:newValue`
Exist bool `json:exist`
}

Expand All @@ -39,34 +43,32 @@ func createStore() *Store{

// set the key to value, return the old value if the key exists
func (s *Store) Set(key string, value string) Response {
fmt.Println("Store SET")
key = path.Clean(key)

oldValue, ok := s.Nodes[key]

if ok {
s.Nodes[key] = value
w.notify(SET, key, oldValue, value)
return Response{oldValue, true}
w.notify(SET, key, oldValue, value, true)
return Response{SET, key, oldValue, value, true}

} else {
s.Nodes[key] = value
w.notify(SET, key, "", value)
return Response{"", false}
w.notify(SET, key, "", value, false)
return Response{SET, key, "", value, false}
}
}

// get the value of the key
func (s *Store) Get(key string) Response {
fmt.Println("Stroe Get")
key = path.Clean(key)

value, ok := s.Nodes[key]

if ok {
return Response{value, true}
return Response{GET, key, value, value, true}
} else {
return Response{"", false}
return Response{GET, key, "", value, false}
}
}

Expand All @@ -79,11 +81,11 @@ func (s *Store) Delete(key string) Response {
if ok {
delete(s.Nodes, key)

w.notify(DELETE, key, oldValue, "")
w.notify(DELETE, key, oldValue, "", true)

return Response{oldValue, true}
return Response{DELETE, key, oldValue, "", true}
} else {
return Response{"", false}
return Response{DELETE, key, "", "", false}
}
}

Expand Down
24 changes: 8 additions & 16 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,7 @@ import (


type Watcher struct {
chanMap map[string][]chan Notification
}

type Notification struct {
action int
key string
oldValue string
newValue string
chanMap map[string][]chan Response
}

// global watcher
Expand All @@ -29,34 +22,33 @@ func init() {
// create a new watcher
func createWatcher() *Watcher {
w := new(Watcher)
w.chanMap = make(map[string][]chan Notification)
w.chanMap = make(map[string][]chan Response)
return w
}

// register a function with channel and prefix to the watcher
func (w *Watcher) add(prefix string, c chan Notification, f func(chan Notification)) error {
func (w *Watcher) add(prefix string, c chan Response) error {

prefix = path.Clean(prefix)
prefix = "/" + path.Clean(prefix)
fmt.Println("Add ", prefix)

_, ok := w.chanMap[prefix]
if !ok {
w.chanMap[prefix] = make([]chan Notification, 0)
w.chanMap[prefix] = make([]chan Response, 0)
w.chanMap[prefix] = append(w.chanMap[prefix], c)
} else {
w.chanMap[prefix] = append(w.chanMap[prefix], c)
}

fmt.Println(len(w.chanMap[prefix]), "@", prefix)

go f(c)
return nil
}

// notify the watcher a action happened
func (w *Watcher) notify(action int, key string, oldValue string, newValue string) error {
func (w *Watcher) notify(action int, key string, oldValue string, newValue string, exist bool) error {
key = path.Clean(key)

fmt.Println("notify")
segments := strings.Split(key, "/")

currPath := "/"
Expand All @@ -73,7 +65,7 @@ func (w *Watcher) notify(action int, key string, oldValue string, newValue strin
if ok {
fmt.Println("found ", currPath)

n := Notification {action, key, oldValue, newValue}
n := Response {action, key, oldValue, newValue, exist}
// notify all the watchers
for _, c := range chans {
c <- n
Expand Down
6 changes: 4 additions & 2 deletions watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ func TestWatch(t *testing.T) {
// watcher := createWatcher()
c := make(chan Notification)
d := make(chan Notification)
w.add("/", c, say)
w.add("/prefix/", d, say)
w.add("/", c)
go say(c)
w.add("/prefix/", d)
go say(d)
s.Set("/prefix/foo", "bar")
}

Expand Down

0 comments on commit 2e679d2

Please sign in to comment.