Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add testAndSet Command #2

Merged
merged 3 commits into from
Jul 8, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,22 @@ func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) {
return store.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex())
}

// Get the path for http request
func (c *SetCommand) GeneratePath() string {
return "set/" + c.Key
// TestAndSet command
type TestAndSetCommand struct {
Key string `json:"key"`
Value string `json:"value"`
PrevValue string `json: prevValue`
ExpireTime time.Time `json:"expireTime"`
}

// The name of the command in the log
func (c *TestAndSetCommand) CommandName() string {
return "testAndSet"
}

// Set the value of key to value
func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
return store.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex())
}

// Get command
Expand All @@ -57,10 +70,6 @@ func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
return json.Marshal(res)
}

func (c *GetCommand) GeneratePath() string {
return "get/" + c.Key
}

// List command
type ListCommand struct {
Prefix string `json:"prefix"`
Expand Down
4 changes: 4 additions & 0 deletions etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func main() {
raft.RegisterCommand(&SetCommand{})
raft.RegisterCommand(&GetCommand{})
raft.RegisterCommand(&DeleteCommand{})
raft.RegisterCommand(&WatchCommand{})
raft.RegisterCommand(&ListCommand{})
raft.RegisterCommand(&TestAndSetCommand{})

if err := os.MkdirAll(dirPath, 0744); err != nil {
fatal("Unable to create path: %v", err)
Expand Down Expand Up @@ -326,6 +329,7 @@ func startClientTransport(port int, st int) {
http.HandleFunc("/v1/keys/", Multiplexer)
http.HandleFunc("/v1/watch/", WatchHttpHandler)
http.HandleFunc("/v1/list/", ListHttpHandler)
http.HandleFunc("/v1/testAndSet/", TestAndSetHttpHandler)
http.HandleFunc("/master", MasterHttpHandler)

switch st {
Expand Down
29 changes: 29 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,35 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {

}

func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/v1/testAndSet/"):]

debug("[recv] POST http://%v/v1/testAndSet/%s", server.Name(), key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, this bit of code seems to be copied for every handler. We may want to use something like Gorilla's logging handler:
http://www.gorillatoolkit.org/pkg/handlers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was using mux before, but found some problem it cannot deal with.
I will clean up the codes after a while.
We need both http and https support. I am going to abstract that layer out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the problem with the Mux? We are using it elsewhere so it would be good to know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fdd6873

Since our keys contain '/'. like we need to handle http://127.0.0.1:4001/keys/foo/foo/foo with /key

I checked the mux at that time, and think do it by myself is more easy.


command := &TestAndSetCommand{}
command.Key = key

command.PrevValue = req.FormValue("prevValue")
command.Value = req.FormValue("value")
strDuration := req.FormValue("ttl")

if strDuration != "" {
duration, err := strconv.Atoi(strDuration)

if err != nil {
warn("raftd: Bad duration: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/raftd/etcd/g

w.WriteHeader(http.StatusInternalServerError)
return
}
command.ExpireTime = time.Now().Add(time.Second * (time.Duration)(duration))
} else {
command.ExpireTime = time.Unix(0, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't an uninitialized time start out at 0 anyways?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

http://play.golang.org/p/WsKT0YYum_
I tested it before. I may record the uninitialized value and compare to that, but it seems a little strange.

}

excute(command, &w, req)

}

func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/v1/keys/"):]

Expand Down
149 changes: 80 additions & 69 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,75 +212,6 @@ func Set(key string, value string, expireTime time.Time, index uint64) ([]byte,
}
}

// should be used as a go routine to delete the key when it expires
func expire(key string, update chan time.Time, expireTime time.Time) {
duration := expireTime.Sub(time.Now())

for {
select {
// timeout delete the node
case <-time.After(duration):
node, ok := s.Tree.get(key)
if !ok {
return
} else {

s.Tree.delete(key)

resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index}

msg, err := json.Marshal(resp)

notify(resp)

// notify the messager
if s.messager != nil && err == nil {

*s.messager <- string(msg)
}

return

}

case updateTime := <-update:
//update duration
// if the node become a permanent one, the go routine is
// not needed
if updateTime.Equal(PERMANENT) {
fmt.Println("permanent")
return
}
// update duration
duration = updateTime.Sub(time.Now())
}
}
}

func updateMap(index uint64, resp *Response) {

if s.ResponseMaxSize == 0 {
return
}

strIndex := strconv.FormatUint(index, 10)
s.ResponseMap[strIndex] = *resp

// unlimited
if s.ResponseMaxSize < 0{
s.ResponseCurrSize++
return
}

if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
s.ResponseStartIndex++
delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
} else {
s.ResponseCurrSize++
}
}


// get the value of the key
func Get(key string) Response {
key = "/" + key
Expand Down Expand Up @@ -375,6 +306,86 @@ func Delete(key string, index uint64) ([]byte, error) {
}
}

// set the value of the key to the value if the given prevValue is equal to the value of the key
func TestAndSet(key string, prevValue string, value string, expireTime time.Time, index uint64) ([]byte, error) {
resp := Get(key)

if resp.PrevValue == prevValue {
return Set(key, value, expireTime, index)
} else {
return json.Marshal(resp)
}
}

// should be used as a go routine to delete the key when it expires
func expire(key string, update chan time.Time, expireTime time.Time) {
duration := expireTime.Sub(time.Now())

for {
select {
// timeout delete the node
case <-time.After(duration):
node, ok := s.Tree.get(key)
if !ok {
return
} else {

s.Tree.delete(key)

resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime, 0, s.Index}

msg, err := json.Marshal(resp)

notify(resp)

// notify the messager
if s.messager != nil && err == nil {

*s.messager <- string(msg)
}

return

}

case updateTime := <-update:
//update duration
// if the node become a permanent one, the go routine is
// not needed
if updateTime.Equal(PERMANENT) {
fmt.Println("permanent")
return
}
// update duration
duration = updateTime.Sub(time.Now())
}
}
}

func updateMap(index uint64, resp *Response) {

if s.ResponseMaxSize == 0 {
return
}

strIndex := strconv.FormatUint(index, 10)
s.ResponseMap[strIndex] = *resp

// unlimited
if s.ResponseMaxSize < 0{
s.ResponseCurrSize++
return
}

if s.ResponseCurrSize == uint(s.ResponseMaxSize) {
s.ResponseStartIndex++
delete(s.ResponseMap, strconv.FormatUint(s.ResponseStartIndex, 10))
} else {
s.ResponseCurrSize++
}
}


// save the current state of the storage system
func (s *Store) Save() ([]byte, error) {
b, err := json.Marshal(s)
Expand Down