Skip to content

Commit

Permalink
Merge remote-tracking branch 'TheCount/feature-repair'
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Burykin committed Nov 18, 2021
2 parents d618510 + e30ea22 commit 6e4c7c4
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 4 deletions.
8 changes: 8 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package goque

import (
"errors"

ldberrors "github.com/syndtr/goleveldb/leveldb/errors"
)

var (
Expand All @@ -21,3 +23,9 @@ var (
// its underlying database.
ErrDBClosed = errors.New("goque: Database is closed")
)

// IsCorrupted returns a boolean indicating whether the error is indicating
// a corruption.
func IsCorrupted(err error) bool {
return ldberrors.IsCorrupted(err)
}
7 changes: 7 additions & 0 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package goque
import (
"os"
"path/filepath"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)

// goqueType defines the type of Goque data structure used.
Expand All @@ -17,6 +20,10 @@ const (
goquePrefixQueue
)

// levelDbOpener is a function type matching both
// leveldb.OpenFile() and leveldb.Recover().
type levelDbOpener func(string, *opt.Options) (*leveldb.DB, error)

// checkGoqueType checks if the type of Goque data structure
// trying to be opened is compatible with the opener type.
//
Expand Down
16 changes: 15 additions & 1 deletion prefix_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,21 @@ type PrefixQueue struct {

// OpenPrefixQueue opens a prefix queue if one exists at the given directory.
// If one does not already exist, a new prefix queue is created.
// If the underlying database is corrupt, an error for which
// IsCorrupted() returns true is returned.
func OpenPrefixQueue(dataDir string) (*PrefixQueue, error) {
return openPrefixQueue(dataDir, leveldb.OpenFile)
}

// RecoverPrefixQueue attempts to recover a corrupt prefix queue.
func RecoverPrefixQueue(dataDir string) (*PrefixQueue, error) {
return openPrefixQueue(dataDir, leveldb.RecoverFile)
}

// openPrefixQueue opens a prefix queue if one exists at the given directory
// using the specified opener.
// If one does not already exist, a new prefix queue is created.
func openPrefixQueue(dataDir string, open levelDbOpener) (*PrefixQueue, error) {
var err error

// Create a new Queue.
Expand All @@ -51,7 +65,7 @@ func OpenPrefixQueue(dataDir string) (*PrefixQueue, error) {
}

// Open database for the prefix queue.
pq.db, err = leveldb.OpenFile(dataDir, nil)
pq.db, err = open(dataDir, nil)
if err != nil {
return nil, err
}
Expand Down
28 changes: 28 additions & 0 deletions prefix_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,34 @@ func TestPrefixQueueOutOfBounds(t *testing.T) {
}
}

func TestPrefixQueueRecover(t *testing.T) {
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
pq, err := OpenPrefixQueue(file)
if err != nil {
t.Error(err)
}
defer pq.Drop()

_, err = pq.EnqueueString("prefix", "value for item")
if err != nil {
t.Error(err)
}

if err = pq.Close(); err != nil {
t.Error(err)
}
if err = os.Remove(file + "/MANIFEST-000000"); err != nil {
t.Error(err)
}

if pq, err = OpenPrefixQueue(file); !IsCorrupted(err) {
t.Errorf("Expected corruption error, got %s", err)
}
if pq, err = RecoverPrefixQueue(file); err != nil {
t.Error(err)
}
}

func BenchmarkPrefixQueueEnqueue(b *testing.B) {
// Open test database
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
Expand Down
17 changes: 16 additions & 1 deletion priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,22 @@ type PriorityQueue struct {
// OpenPriorityQueue opens a priority queue if one exists at the given
// directory. If one does not already exist, a new priority queue is
// created.
// If the underlying database is corrupt, an error for which
// IsCorrupted() returns true is returned.
func OpenPriorityQueue(dataDir string, order order) (*PriorityQueue, error) {
return openPriorityQueue(dataDir, order, leveldb.OpenFile)
}

// RecoverPriorityQueue attempts to recover a corrupt priority queue.
func RecoverPriorityQueue(dataDir string, order order) (*PriorityQueue, error) {
return openPriorityQueue(dataDir, order, leveldb.RecoverFile)
}

// openPriorityQueue opens a priority queue if one exists at the given
// directory using the specified opener. If one does not already exist,
// a new priority queue is
// created.
func openPriorityQueue(dataDir string, order order, open levelDbOpener) (*PriorityQueue, error) {
var err error

// Create a new PriorityQueue.
Expand All @@ -62,7 +77,7 @@ func OpenPriorityQueue(dataDir string, order order) (*PriorityQueue, error) {
}

// Open database for the priority queue.
pq.db, err = leveldb.OpenFile(dataDir, nil)
pq.db, err = open(dataDir, nil)
if err != nil {
return pq, err
}
Expand Down
28 changes: 28 additions & 0 deletions priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,34 @@ func TestPriorityQueueOutOfBounds(t *testing.T) {
}
}

func TestPriorityQueueRecover(t *testing.T) {
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
pq, err := OpenPriorityQueue(file, ASC)
if err != nil {
t.Error(err)
}
defer pq.Drop()

_, err = pq.EnqueueString(0, "value for item")
if err != nil {
t.Error(err)
}

if err = pq.Close(); err != nil {
t.Error(err)
}
if err = os.Remove(file + "/MANIFEST-000000"); err != nil {
t.Error(err)
}

if pq, err = OpenPriorityQueue(file, ASC); !IsCorrupted(err) {
t.Errorf("Expected corruption error, got %s", err)
}
if pq, err = RecoverPriorityQueue(file, ASC); err != nil {
t.Error(err)
}
}

func BenchmarkPriorityQueueEnqueue(b *testing.B) {
// Open test database
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
Expand Down
16 changes: 15 additions & 1 deletion queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,21 @@ type Queue struct {

// OpenQueue opens a queue if one exists at the given directory. If one
// does not already exist, a new queue is created.
// If the underlying database is corrupt, an error for which
// IsCorrupted() returns true is returned.
func OpenQueue(dataDir string) (*Queue, error) {
return openQueue(dataDir, leveldb.OpenFile)
}

// RecoverQueue attempts to recover a corrupt queue.
func RecoverQueue(dataDir string) (*Queue, error) {
return openQueue(dataDir, leveldb.RecoverFile)
}

// openQueue opens a queue if one exists at the given directory
// using the specified opener. If one
// does not already exist, a new queue is created.
func openQueue(dataDir string, open levelDbOpener) (*Queue, error) {
var err error

// Create a new Queue.
Expand All @@ -35,7 +49,7 @@ func OpenQueue(dataDir string) (*Queue, error) {
}

// Open database for the queue.
q.db, err = leveldb.OpenFile(dataDir, nil)
q.db, err = open(dataDir, nil)
if err != nil {
return q, err
}
Expand Down
28 changes: 28 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,34 @@ func TestQueueOutOfBounds(t *testing.T) {
}
}

func TestQueueRecover(t *testing.T) {
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
q, err := OpenQueue(file)
if err != nil {
t.Error(err)
}
defer q.Drop()

_, err = q.EnqueueString("value for item")
if err != nil {
t.Error(err)
}

if err = q.Close(); err != nil {
t.Error(err)
}
if err = os.Remove(file + "/MANIFEST-000000"); err != nil {
t.Error(err)
}

if q, err = OpenQueue(file); !IsCorrupted(err) {
t.Errorf("Expected corruption error, got %s", err)
}
if q, err = RecoverQueue(file); err != nil {
t.Error(err)
}
}

func BenchmarkQueueEnqueue(b *testing.B) {
// Open test database
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
Expand Down
16 changes: 15 additions & 1 deletion stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,21 @@ type Stack struct {

// OpenStack opens a stack if one exists at the given directory. If one
// does not already exist, a new stack is created.
// If the underlying database is corrupt, an error for which
// IsCorrupted() returns true is returned.
func OpenStack(dataDir string) (*Stack, error) {
return openStack(dataDir, leveldb.OpenFile)
}

// RecoverStack attempts to recover a corrupt stack.
func RecoverStack(dataDir string) (*Stack, error) {
return openStack(dataDir, leveldb.RecoverFile)
}

// openStack opens a stack if one exists at the given directory
// using the specified opener. If one
// does not already exist, a new stack is created.
func openStack(dataDir string, open levelDbOpener) (*Stack, error) {
var err error

// Create a new Stack.
Expand All @@ -35,7 +49,7 @@ func OpenStack(dataDir string) (*Stack, error) {
}

// Open database for the stack.
s.db, err = leveldb.OpenFile(dataDir, nil)
s.db, err = open(dataDir, nil)
if err != nil {
return s, err
}
Expand Down
28 changes: 28 additions & 0 deletions stack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,34 @@ func TestStackOutOfBounds(t *testing.T) {
}
}

func TestStackRecover(t *testing.T) {
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
s, err := OpenStack(file)
if err != nil {
t.Error(err)
}
defer s.Drop()

_, err = s.PushString("value for item")
if err != nil {
t.Error(err)
}

if err = s.Close(); err != nil {
t.Error(err)
}
if err = os.Remove(file + "/MANIFEST-000000"); err != nil {
t.Error(err)
}

if s, err = OpenStack(file); !IsCorrupted(err) {
t.Errorf("Expected corruption error, got %s", err)
}
if s, err = RecoverStack(file); err != nil {
t.Error(err)
}
}

func BenchmarkStackPush(b *testing.B) {
// Open test database
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
Expand Down

0 comments on commit 6e4c7c4

Please sign in to comment.