Skip to content

Commit

Permalink
Merge pull request #2 from xiangli-cmu/master
Browse files Browse the repository at this point in the history
Add testAndSet Command
  • Loading branch information
xiang90 committed Jul 8, 2013
2 parents cd2355e + 67a06ec commit b43fba6
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 76 deletions.
23 changes: 16 additions & 7 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,22 @@ func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) {
return store.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex())
}

// Get the path for http request
func (c *SetCommand) GeneratePath() string {
return "set/" + c.Key
// TestAndSet command
type TestAndSetCommand struct {
Key string `json:"key"`
Value string `json:"value"`
PrevValue string `json: prevValue`
ExpireTime time.Time `json:"expireTime"`
}

// The name of the command in the log
func (c *TestAndSetCommand) CommandName() string {
return "testAndSet"
}

// Set the value of key to value
func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
return store.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex())
}

// Get command
Expand All @@ -57,10 +70,6 @@ func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
return json.Marshal(res)
}

func (c *GetCommand) GeneratePath() string {
return "get/" + c.Key
}

// List command
type ListCommand struct {
Prefix string `json:"prefix"`
Expand Down
4 changes: 4 additions & 0 deletions etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func main() {
raft.RegisterCommand(&SetCommand{})
raft.RegisterCommand(&GetCommand{})
raft.RegisterCommand(&DeleteCommand{})
raft.RegisterCommand(&WatchCommand{})
raft.RegisterCommand(&ListCommand{})
raft.RegisterCommand(&TestAndSetCommand{})

if err := os.MkdirAll(dirPath, 0744); err != nil {
fatal("Unable to create path: %v", err)
Expand Down Expand Up @@ -326,6 +329,7 @@ func startClientTransport(port int, st int) {
http.HandleFunc("/v1/keys/", Multiplexer)
http.HandleFunc("/v1/watch/", WatchHttpHandler)
http.HandleFunc("/v1/list/", ListHttpHandler)
http.HandleFunc("/v1/testAndSet/", TestAndSetHttpHandler)
http.HandleFunc("/master", MasterHttpHandler)

switch st {
Expand Down
29 changes: 29 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,35 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {

}

func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/v1/testAndSet/"):]

debug("[recv] POST http://%v/v1/testAndSet/%s", server.Name(), key)

command := &TestAndSetCommand{}
command.Key = key

command.PrevValue = req.FormValue("prevValue")
command.Value = req.FormValue("value")
strDuration := req.FormValue("ttl")

if strDuration != "" {
duration, err := strconv.Atoi(strDuration)

if err != nil {
warn("raftd: Bad duration: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
command.ExpireTime = time.Now().Add(time.Second * (time.Duration)(duration))
} else {
command.ExpireTime = time.Unix(0, 0)
}

excute(command, &w, req)

}

func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/v1/keys/"):]

Expand Down
149 changes: 80 additions & 69 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,75 +212,6 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte,
}
}

// should be used as a go routine to delete the key when it expires
func expire(key string, update chan time.Time, expireTime time.Time) {
duration := expireTime.Sub(time.Now())

for {
select {
// timeout delete the node
case <-time.After(duration):
node, ok := s.Tree.get(key)
if !ok {
return
} else {

s.Tree.delete(key)

resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index}

msg, err := json.Marshal(resp)

notify(resp)

// notify the messager
if s.messager != nil && err == nil {

*s.messager <- string(msg)
}

return

}

case updateTime := <-update:
//update duration
// if the node become a permanent one, the go routine is
// not needed
if updateTime.Equal(PERMANENT) {
fmt.Println("permanent")
return
}
// update duration
duration = updateTime.Sub(time.Now())
}
}
}

func updateMap(index uint64, resp *Response) {

if s.ResponseMaxSize == 0 {
return
}

strIndex := strconv.FormatUint(index, 10)
s.ResponseMap[strIndex] = *resp

// unlimited
if s.ResponseMaxSize < 0{
s.ResponseCurrSize++
return
}

if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
s.ResponseStartIndex++
delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
} else {
s.ResponseCurrSize++
}
}


// get the value of the key
func Get(key string) Response {
key = "/" + key
Expand Down Expand Up @@ -375,6 +306,86 @@ func Delete(key string, index uint64) ([]byte, error) {
}
}

// set the value of the key to the value if the given prevValue is equal to the value of the key
func TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
resp := Get(key)

if resp.PrevValue == prevValue {
return Set(key, value, expireTime, index)
} else {
return json.Marshal(resp)
}
}

// should be used as a go routine to delete the key when it expires
func expire(key string, update chan time.Time, expireTime time.Time) {
duration := expireTime.Sub(time.Now())

for {
select {
// timeout delete the node
case <-time.After(duration):
node, ok := s.Tree.get(key)
if !ok {
return
} else {

s.Tree.delete(key)

resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index}

msg, err := json.Marshal(resp)

notify(resp)

// notify the messager
if s.messager != nil && err == nil {

*s.messager <- string(msg)
}

return

}

case updateTime := <-update:
//update duration
// if the node become a permanent one, the go routine is
// not needed
if updateTime.Equal(PERMANENT) {
fmt.Println("permanent")
return
}
// update duration
duration = updateTime.Sub(time.Now())
}
}
}

func updateMap(index uint64, resp *Response) {

if s.ResponseMaxSize == 0 {
return
}

strIndex := strconv.FormatUint(index, 10)
s.ResponseMap[strIndex] = *resp

// unlimited
if s.ResponseMaxSize < 0{
s.ResponseCurrSize++
return
}

if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
s.ResponseStartIndex++
delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
} else {
s.ResponseCurrSize++
}
}


// save the current state of the storage system
func (s *Store) Save() ([]byte, error) {
b, err := json.Marshal(s)
Expand Down

0 comments on commit b43fba6

Please sign in to comment.