diff --git a/command.go b/command.go index aec39c5626b..a43ab43675c 100644 --- a/command.go +++ b/command.go @@ -36,9 +36,22 @@ func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) { return store.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex()) } -// Get the path for http request -func (c *SetCommand) GeneratePath() string { - return "set/" + c.Key +// TestAndSet command +type TestAndSetCommand struct { + Key string `json:"key"` + Value string `json:"value"` + PrevValue string `json: prevValue` + ExpireTime time.Time `json:"expireTime"` +} + +// The name of the command in the log +func (c *TestAndSetCommand) CommandName() string { + return "testAndSet" +} + +// Set the value of key to value +func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) { + return store.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex()) } // Get command @@ -57,10 +70,6 @@ func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) { return json.Marshal(res) } -func (c *GetCommand) GeneratePath() string { - return "get/" + c.Key -} - // List command type ListCommand struct { Prefix string `json:"prefix"` diff --git a/etcd.go b/etcd.go index 705dd5bf6cb..813efe3f006 100644 --- a/etcd.go +++ b/etcd.go @@ -137,6 +137,9 @@ func main() { raft.RegisterCommand(&SetCommand{}) raft.RegisterCommand(&GetCommand{}) raft.RegisterCommand(&DeleteCommand{}) + raft.RegisterCommand(&WatchCommand{}) + raft.RegisterCommand(&ListCommand{}) + raft.RegisterCommand(&TestAndSetCommand{}) if err := os.MkdirAll(dirPath, 0744); err != nil { fatal("Unable to create path: %v", err) @@ -326,6 +329,7 @@ func startClientTransport(port int, st int) { http.HandleFunc("/v1/keys/", Multiplexer) http.HandleFunc("/v1/watch/", WatchHttpHandler) http.HandleFunc("/v1/list/", ListHttpHandler) + http.HandleFunc("/v1/testAndSet/", TestAndSetHttpHandler) http.HandleFunc("/master", MasterHttpHandler) switch st { diff --git a/handlers.go b/handlers.go index 89274727890..1c56f5beffe 100644 --- a/handlers.go +++ b/handlers.go @@ -134,6 +134,35 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) { } +func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) { + key := req.URL.Path[len("/v1/testAndSet/"):] + + debug("[recv] POST http://%v/v1/testAndSet/%s", server.Name(), key) + + command := &TestAndSetCommand{} + command.Key = key + + command.PrevValue = req.FormValue("prevValue") + command.Value = req.FormValue("value") + strDuration := req.FormValue("ttl") + + if strDuration != "" { + duration, err := strconv.Atoi(strDuration) + + if err != nil { + warn("raftd: Bad duration: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + command.ExpireTime = time.Now().Add(time.Second * (time.Duration)(duration)) + } else { + command.ExpireTime = time.Unix(0, 0) + } + + excute(command, &w, req) + +} + func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/v1/keys/"):] diff --git a/store/store.go b/store/store.go index 9f56827a26e..72ff06a5bcf 100644 --- a/store/store.go +++ b/store/store.go @@ -212,75 +212,6 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte, } } -// should be used as a go routine to delete the key when it expires -func expire(key string, update chan time.Time, expireTime time.Time) { - duration := expireTime.Sub(time.Now()) - - for { - select { - // timeout delete the node - case <-time.After(duration): - node, ok := s.Tree.get(key) - if !ok { - return - } else { - - s.Tree.delete(key) - - resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index} - - msg, err := json.Marshal(resp) - - notify(resp) - - // notify the messager - if s.messager != nil && err == nil { - - *s.messager <- string(msg) - } - - return - - } - - case updateTime := <-update: - //update duration - // if the node become a permanent one, the go routine is - // not needed - if updateTime.Equal(PERMANENT) { - fmt.Println("permanent") - return - } - // update duration - duration = updateTime.Sub(time.Now()) - } - } -} - -func updateMap(index uint64, resp *Response) { - - if s.ResponseMaxSize == 0 { - return - } - - strIndex := strconv.FormatUint(index, 10) - s.ResponseMap[strIndex] = *resp - - // unlimited - if s.ResponseMaxSize < 0{ - s.ResponseCurrSize++ - return - } - - if s.ResponseCurrSize == uint(s.ResponseMaxSize) { - s.ResponseStartIndex++ - delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10)) - } else { - s.ResponseCurrSize++ - } -} - - // get the value of the key func Get(key string) Response { key = "/" + key @@ -375,6 +306,86 @@ func Delete(key string, index uint64) ([]byte, error) { } } +// set the value of the key to the value if the given prevValue is equal to the value of the key +func TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) { + resp := Get(key) + + if resp.PrevValue == prevValue { + return Set(key, value, expireTime, index) + } else { + return json.Marshal(resp) + } +} + +// should be used as a go routine to delete the key when it expires +func expire(key string, update chan time.Time, expireTime time.Time) { + duration := expireTime.Sub(time.Now()) + + for { + select { + // timeout delete the node + case <-time.After(duration): + node, ok := s.Tree.get(key) + if !ok { + return + } else { + + s.Tree.delete(key) + + resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index} + + msg, err := json.Marshal(resp) + + notify(resp) + + // notify the messager + if s.messager != nil && err == nil { + + *s.messager <- string(msg) + } + + return + + } + + case updateTime := <-update: + //update duration + // if the node become a permanent one, the go routine is + // not needed + if updateTime.Equal(PERMANENT) { + fmt.Println("permanent") + return + } + // update duration + duration = updateTime.Sub(time.Now()) + } + } +} + +func updateMap(index uint64, resp *Response) { + + if s.ResponseMaxSize == 0 { + return + } + + strIndex := strconv.FormatUint(index, 10) + s.ResponseMap[strIndex] = *resp + + // unlimited + if s.ResponseMaxSize < 0{ + s.ResponseCurrSize++ + return + } + + if s.ResponseCurrSize == uint(s.ResponseMaxSize) { + s.ResponseStartIndex++ + delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10)) + } else { + s.ResponseCurrSize++ + } +} + + // save the current state of the storage system func (s *Store) Save() ([]byte, error) { b, err := json.Marshal(s)