diff --git a/redis/redis.go b/redis/redis.go new file mode 100644 index 0000000..4b9ff32 --- /dev/null +++ b/redis/redis.go @@ -0,0 +1,85 @@ +package redis + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/fzzy/radix/redis" + + datastore "github.com/jbenet/go-datastore" + query "github.com/jbenet/go-datastore/query" +) + +var _ datastore.Datastore = &Datastore{} +var _ datastore.ThreadSafeDatastore = &Datastore{} + +var ErrInvalidType = errors.New("redis datastore: invalid type error. this datastore only supports []byte values") + +func NewExpiringDatastore(client *redis.Client, ttl time.Duration) (datastore.ThreadSafeDatastore, error) { + return &Datastore{ + client: client, + ttl: ttl, + }, nil +} + +func NewDatastore(client *redis.Client) (datastore.ThreadSafeDatastore, error) { + return &Datastore{ + client: client, + }, nil +} + +type Datastore struct { + mu sync.Mutex + client *redis.Client + ttl time.Duration +} + +func (ds *Datastore) Put(key datastore.Key, value interface{}) error { + ds.mu.Lock() + defer ds.mu.Unlock() + + data, ok := value.([]byte) + if !ok { + return ErrInvalidType + } + + ds.client.Append("SET", key.String(), data) + if ds.ttl != 0 { + ds.client.Append("EXPIRE", key.String(), ds.ttl.Seconds()) + } + if err := ds.client.GetReply().Err; err != nil { + return fmt.Errorf("failed to put value: %s", err) + } + if ds.ttl != 0 { + if err := ds.client.GetReply().Err; err != nil { + return fmt.Errorf("failed to set expiration: %s", err) + } + } + return nil +} + +func (ds *Datastore) Get(key datastore.Key) (value interface{}, err error) { + ds.mu.Lock() + defer ds.mu.Unlock() + return ds.client.Cmd("GET", key.String()).Bytes() +} + +func (ds *Datastore) Has(key datastore.Key) (exists bool, err error) { + ds.mu.Lock() + defer ds.mu.Unlock() + return ds.client.Cmd("EXISTS", key.String()).Bool() +} + +func (ds *Datastore) Delete(key datastore.Key) (err error) { + ds.mu.Lock() + defer ds.mu.Unlock() + return ds.client.Cmd("DEL", key.String()).Err +} + +func (ds *Datastore) Query(q query.Query) (query.Results, error) { + return nil, errors.New("TODO implement query for redis datastore?") +} + +func (ds *Datastore) IsThreadSafe() {} diff --git a/redis/redis_test.go b/redis/redis_test.go new file mode 100644 index 0000000..0b3e273 --- /dev/null +++ b/redis/redis_test.go @@ -0,0 +1,109 @@ +package redis + +import ( + "bytes" + "os" + "testing" + "time" + + "github.com/fzzy/radix/redis" + datastore "github.com/jbenet/go-datastore" + + dstest "github.com/jbenet/go-datastore/test" +) + +const RedisEnv = "REDIS_DATASTORE_TEST_HOST" + +func TestPutGetBytes(t *testing.T) { + client := clientOrAbort(t) + ds, err := NewDatastore(client) + if err != nil { + t.Fatal(err) + } + key, val := datastore.NewKey("foo"), []byte("bar") + dstest.Nil(ds.Put(key, val), t) + v, err := ds.Get(key) + if err != nil { + t.Fatal(err) + } + if bytes.Compare(v.([]byte), val) != 0 { + t.Fail() + } +} + +func TestHasBytes(t *testing.T) { + client := clientOrAbort(t) + ds, err := NewDatastore(client) + if err != nil { + t.Fatal(err) + } + key, val := datastore.NewKey("foo"), []byte("bar") + has, err := ds.Has(key) + if err != nil { + t.Fatal(err) + } + if has { + t.Fail() + } + + dstest.Nil(ds.Put(key, val), t) + hasAfterPut, err := ds.Has(key) + if err != nil { + t.Fatal(err) + } + if !hasAfterPut { + t.Fail() + } +} + +func TestDelete(t *testing.T) { + client := clientOrAbort(t) + ds, err := NewDatastore(client) + if err != nil { + t.Fatal(err) + } + key, val := datastore.NewKey("foo"), []byte("bar") + dstest.Nil(ds.Put(key, val), t) + dstest.Nil(ds.Delete(key), t) + + hasAfterDelete, err := ds.Has(key) + if err != nil { + t.Fatal(err) + } + if hasAfterDelete { + t.Fail() + } +} + +func TestExpiry(t *testing.T) { + ttl := 1 * time.Second + client := clientOrAbort(t) + ds, err := NewExpiringDatastore(client, ttl) + if err != nil { + t.Fatal(err) + } + key, val := datastore.NewKey("foo"), []byte("bar") + dstest.Nil(ds.Put(key, val), t) + time.Sleep(ttl + 1*time.Second) + dstest.Nil(ds.Delete(key), t) + + hasAfterExpiration, err := ds.Has(key) + if err != nil { + t.Fatal(err) + } + if hasAfterExpiration { + t.Fail() + } +} + +func clientOrAbort(t *testing.T) *redis.Client { + c, err := redis.Dial("tcp", os.Getenv(RedisEnv)) + if err != nil { + t.Log("could not connect to a redis instance") + t.SkipNow() + } + if err := c.Cmd("FLUSHALL").Err; err != nil { + t.Fatal(err) + } + return c +} diff --git a/test/assert.go b/test/assert.go new file mode 100644 index 0000000..9cf7e65 --- /dev/null +++ b/test/assert.go @@ -0,0 +1,25 @@ +package dstest + +import "testing" + +func Nil(err error, t *testing.T, msgs ...string) { + if err != nil { + t.Fatal(msgs, "error:", err) + } +} + +func True(v bool, t *testing.T, msgs ...string) { + if !v { + t.Fatal(msgs) + } +} + +func False(v bool, t *testing.T, msgs ...string) { + True(!v, t, msgs...) +} + +func Err(err error, t *testing.T, msgs ...string) { + if err == nil { + t.Fatal(msgs, "error:", err) + } +}