Skip to content

Commit

Permalink
ArchiveBatch and DeleteBatch return msg ids (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
Craig Pastro authored Sep 27, 2023
1 parent 62ad344 commit 2f1e103
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 39 deletions.
29 changes: 10 additions & 19 deletions pgmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,6 @@ func (p *PGMQ) Read(ctx context.Context, queue string, vt int64) (*Message, erro
// messages that are returned are made invisible for the duration of the
// visibility timeout (vt) in seconds. If vt is 0 it will be set to the
// default value, vtDefault.
//
// If the queue is empty or all messages are invisible an ErrNoRows error is
// returned.
func (p *PGMQ) ReadBatch(ctx context.Context, queue string, vt int64, numMsgs int64) ([]*Message, error) {
if vt == 0 {
vt = vtDefault
Expand All @@ -202,10 +199,6 @@ func (p *PGMQ) ReadBatch(ctx context.Context, queue string, vt int64, numMsgs in
msgs = append(msgs, &msg)
}

if len(msgs) == 0 {
return nil, ErrNoRows
}

return msgs, nil
}

Expand Down Expand Up @@ -246,21 +239,20 @@ func (p *PGMQ) Archive(ctx context.Context, queue string, msgID int64) (bool, er
// table by their ids. View messages on the archive table with sql:
//
// SELECT * FROM pgmq.a_<queue_name>_archive;
func (p *PGMQ) ArchiveBatch(ctx context.Context, queue string, msgIDs []int64) ([]bool, error) {
func (p *PGMQ) ArchiveBatch(ctx context.Context, queue string, msgIDs []int64) ([]int64, error) {
rows, err := p.db.Query(ctx, "SELECT pgmq.archive($1, $2::bigint[])", queue, msgIDs)
if err != nil {
return nil, wrapPostgresError(err)
}
defer rows.Close()

var archived []bool
var archived []int64
for rows.Next() {
var b bool
err = rows.Scan(&b)
if err != nil {
var n int64
if err := rows.Scan(&n); err != nil {
return nil, wrapPostgresError(err)
}
archived = append(archived, b)
archived = append(archived, n)
}

return archived, nil
Expand All @@ -282,21 +274,20 @@ func (p *PGMQ) Delete(ctx context.Context, queue string, msgID int64) (bool, err
// DeleteBatch deletes a batch of messages from the queue by their ids. This
// is a permanent delete and cannot be undone. If you want to retain a log of
// the messages, use the ArchiveBatch method.
func (p *PGMQ) DeleteBatch(ctx context.Context, queue string, msgIDs []int64) ([]bool, error) {
func (p *PGMQ) DeleteBatch(ctx context.Context, queue string, msgIDs []int64) ([]int64, error) {
rows, err := p.db.Query(ctx, "SELECT pgmq.delete($1, $2::bigint[])", queue, msgIDs)
if err != nil {
return nil, wrapPostgresError(err)
}
defer rows.Close()

var deleted []bool
var deleted []int64
for rows.Next() {
var b bool
err = rows.Scan(&b)
if err != nil {
var n int64
if err := rows.Scan(&n); err != nil {
return nil, wrapPostgresError(err)
}
deleted = append(deleted, b)
deleted = append(deleted, n)
}

return deleted, nil
Expand Down
24 changes: 4 additions & 20 deletions pgmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,9 @@ func TestReadBatch(t *testing.T) {
require.Equal(t, testMsg2, msgs[1].Message)

// Visibility timeout will still be in effect.
_, err = q.ReadBatch(ctx, queue, 0, 5)
require.ErrorIs(t, err, ErrNoRows)
}

func TestReadBatchEmptyQueueReturnsNoRows(t *testing.T) {
ctx := context.Background()
queue := t.Name()

err := q.CreateQueue(ctx, queue)
msgs, err = q.ReadBatch(ctx, queue, 0, 5)
require.NoError(t, err)

_, err = q.ReadBatch(ctx, queue, 0, 1)
require.ErrorIs(t, err, ErrNoRows)
require.Empty(t, msgs)
}

func TestPop(t *testing.T) {
Expand Down Expand Up @@ -260,12 +250,9 @@ func TestArchiveBatch(t *testing.T) {
ids, err := q.SendBatch(ctx, queue, []map[string]any{testMsg1, testMsg2})
require.NoError(t, err)

// Add a msgID that definitely does not exist to the end.
ids = append(ids, -1)

archived, err := q.ArchiveBatch(ctx, queue, ids)
require.NoError(t, err)
require.Equal(t, []bool{true, true, false}, archived)
require.Equal(t, ids, archived)

// Let's check that the two messages landed in the archive table.
stmt := fmt.Sprintf("SELECT * FROM pgmq.a_%s", queue)
Expand Down Expand Up @@ -317,12 +304,9 @@ func TestDeleteBatch(t *testing.T) {
ids, err := q.SendBatch(ctx, queue, []map[string]any{testMsg1, testMsg2})
require.NoError(t, err)

// Add a msgID that definitely does not exist to the end.
ids = append(ids, -1)

deleted, err := q.DeleteBatch(ctx, queue, ids)
require.NoError(t, err)
require.EqualValues(t, []bool{true, true, false}, deleted)
require.EqualValues(t, ids, deleted)

_, err = q.Read(ctx, queue, 0)
require.ErrorIs(t, err, ErrNoRows)
Expand Down

0 comments on commit 2f1e103

Please sign in to comment.