Skip to content

Commit

Permalink
Fix: abort command returns message to a proper queue/cgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Bogdanovich committed Nov 9, 2015
1 parent f288ffb commit 5b4fd08
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 6 deletions.
2 changes: 1 addition & 1 deletion controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewSession(conn Conn, repo *repository.QueueRepository) *Controller {
// FinishSession aborts unfinished transaction
func (c *Controller) FinishSession() {
if c.currentValue != nil {
c.abort(c.currentCommand)
c.abort()
}
atomic.AddUint64(&c.repo.Stats.CurrentConnections, ^uint64(0))
}
Expand Down
8 changes: 4 additions & 4 deletions controller/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (c *Controller) Get(input []string) error {
err = c.get(cmd)
}
case "abort":
err = c.abort(cmd)
err = c.abort()
case "peek":
err = c.peek(cmd)
default:
Expand Down Expand Up @@ -82,11 +82,11 @@ func (c *Controller) getClose(cmd *Command) error {
return nil
}

func (c *Controller) abort(cmd *Command) error {
func (c *Controller) abort() error {
if c.currentValue != nil {
q, err := c.getConsumer(cmd)
q, err := c.getConsumer(c.currentCommand)
if err != nil {
log.Println(cmd, err)
log.Println(c.currentCommand, err)
return NewError("ERROR", err)
}
err = q.PutBack(c.currentValue)
Expand Down
42 changes: 41 additions & 1 deletion controller/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func Test_Controller_Gets(t *testing.T) {
// Read one more item from consumer goup (returns nothing)
// Add one item to the original queue
// Read an item from consumer group (returns previously added value)
func Test_Controller_Get_ConsumerGroup(t *testing.T) {
func Test_Controller_ConsumerGroup_Get(t *testing.T) {
repo, controller, mockTCPConn := setupControllerTest(t, 3)
defer cleanupControllerTest(repo)

Expand Down Expand Up @@ -371,3 +371,43 @@ func Test_Controller_Get_ConsumerGroup(t *testing.T) {
assert.Equal(t, "VALUE test 0 1\r\n3\r\nEND\r\n", mockTCPConn.WriteBuffer.String())

}

// Initialize original queue with 3 items
// Open reliable read using consumer group
// Abort the reliable read using
// 1) <queue>/abort
// 2) <queue>:<cursor>/abort syntax
// Make sure same item gets served again
func Test_Controller_ConsumerGroup_GetAbort(t *testing.T) {
repo, controller, mockTCPConn := setupControllerTest(t, 3)
defer cleanupControllerTest(repo)

abortCommands := []string{"test/abort", "test:cgroup/abort"}

for _, abortCommand := range abortCommands {

command := []string{"get", "test:cgroup/open"}
err = controller.Get(command)
assert.NoError(t, err)
assert.Equal(t, "VALUE test 0 1\r\n0\r\nEND\r\n",
mockTCPConn.WriteBuffer.String())

mockTCPConn.WriteBuffer.Reset()

command = []string{"get", abortCommand}
err = controller.Get(command)
assert.NoError(t, err)
assert.Equal(t, "END\r\n", mockTCPConn.WriteBuffer.String())

mockTCPConn.WriteBuffer.Reset()

command = []string{"get", "test:cgroup/peek"}
err = controller.Get(command)
assert.NoError(t, err)
assert.Equal(t, "VALUE test 0 1\r\n0\r\nEND\r\n",
mockTCPConn.WriteBuffer.String())

mockTCPConn.WriteBuffer.Reset()

}
}

0 comments on commit 5b4fd08

Please sign in to comment.