Skip to content

Commit

Permalink
refactor: fix some of the tests and add a few more
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Apr 18, 2022
1 parent 888bb14 commit 477780d
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 42 deletions.
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ type DMap interface {

Scan(ctx context.Context, options ...ScanOption) (Iterator, error)

// Destroy flushes the given dmap on the cluster. You should know that there
// Destroy flushes the given DMap on the cluster. You should know that there
// is no global lock on DMaps. So if you call Put/PutEx and Destroy methods
// concurrently on the cluster, Put call may set new values to the dmap.
Destroy(ctx context.Context) error
Expand Down
17 changes: 13 additions & 4 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ package olric
import (
"context"
"fmt"

"github.com/buraksezer/olric/internal/protocol"
"github.com/tidwall/redcon"
"strconv"
)

type Route struct {
Expand All @@ -34,8 +34,17 @@ func mapToRoutingTable(slice []interface{}) (RoutingTable, error) {
for _, raw := range slice {
item := raw.([]interface{})
rawPartID, rawPrimaryOwners, rawReplicaOwners := item[0], item[1], item[2]
partID, ok := rawPartID.(int64)
if !ok {
var partID uint64
switch rawPartID.(type) {
case int64:
partID = uint64(rawPartID.(int64))
case string:
raw, err := strconv.ParseUint(rawPartID.(string), 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid partition id: %v: %w", rawPartID, err)
}
partID = raw
default:
return nil, fmt.Errorf("invalid partition id: %v", rawPartID)
}

Expand Down Expand Up @@ -63,7 +72,7 @@ func mapToRoutingTable(slice []interface{}) (RoutingTable, error) {
}
r.ReplicaOwners = append(r.ReplicaOwners, owner)
}
rt[uint64(partID)] = r
rt[partID] = r
}
return rt, nil
}
Expand Down
20 changes: 20 additions & 0 deletions cluster_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,26 @@ func TestClusterClient_RoutingTable(t *testing.T) {
require.Len(t, rt, int(db.config.PartitionCount))
}

func TestClusterClient_RoutingTable_Cluster(t *testing.T) {
cluster := newTestOlricCluster(t)
cluster.addMember(t) // Cluster coordinator
<-time.After(250 * time.Millisecond)

db := cluster.addMember(t)

ctx := context.Background()
c, err := NewClusterClient([]string{db.name})
require.NoError(t, err)
defer func() {
require.NoError(t, c.Close(ctx))
}()

rt, err := c.RoutingTable(ctx)
require.NoError(t, err)

require.Len(t, rt, int(db.config.PartitionCount))
}

func TestClusterClient_Put(t *testing.T) {
cluster := newTestOlricCluster(t)
db := cluster.addMember(t)
Expand Down
9 changes: 0 additions & 9 deletions embedded_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,15 +276,6 @@ func (e *EmbeddedClient) NewPubSub(options ...PubSubOption) (*PubSub, error) {
return newPubSub(e.db.client, options...)
}

func (e *EmbeddedClient) NewPubSubWithAddr(addr string) (*PubSub, error) {
// TODO: Add an error type to Get
rc := e.db.client.Get(addr)
return &PubSub{
rc: rc,
client: e.db.client,
}, nil
}

func (db *Olric) NewEmbeddedClient() *EmbeddedClient {
return &EmbeddedClient{db: db}
}
Expand Down
13 changes: 8 additions & 5 deletions embedded_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func TestEmbeddedClient_DMap_LockWithTimeout_Then_Lease(t *testing.T) {
require.ErrorIs(t, err, ErrLockNotAcquired)
}

func TestEmbeddedClient_RoutingTable(t *testing.T) {
func TestEmbeddedClient_RoutingTable_Standalone(t *testing.T) {
cluster := newTestOlricCluster(t)
db := cluster.addMember(t)

Expand All @@ -503,14 +503,17 @@ func TestEmbeddedClient_RoutingTable(t *testing.T) {

func TestEmbeddedClient_RoutingTable_Cluster(t *testing.T) {
cluster := newTestOlricCluster(t)
db := cluster.addMember(t)
cluster.addMember(t)

cluster.addMember(t) // Cluster coordinator
<-time.After(250 * time.Millisecond)

cluster.addMember(t)
db2 := cluster.addMember(t)

e := db.NewEmbeddedClient()
e := db2.NewEmbeddedClient()
rt, err := e.RoutingTable(context.Background())
require.NoError(t, err)
require.Len(t, rt, int(db.config.PartitionCount))
require.Len(t, rt, int(db2.config.PartitionCount))
owners := make(map[string]struct{})
for _, route := range rt {
for _, owner := range route.PrimaryOwners {
Expand Down
29 changes: 29 additions & 0 deletions get_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ func TestDMap_Get_GetResponse(t *testing.T) {
require.Equal(t, value, scannedValue)
})

t.Run("TTL", func(t *testing.T) {
var value = []byte("olric")
err = dm.Put(ctx, "mykey-byte", value, &dmap.PutConfig{
HasEX: true,
EX: time.Second,
})
require.NoError(t, err)

e, err := dm.Get(ctx, "mykey-byte")
require.NoError(t, err)

gr := &GetResponse{entry: e}
ttl := gr.TTL()
require.Greater(t, ttl, int64(0))
})

t.Run("Timestamp", func(t *testing.T) {
var value = []byte("olric")
err = dm.Put(ctx, "mykey-byte", value, nil)
require.NoError(t, err)

e, err := dm.Get(ctx, "mykey-byte")
require.NoError(t, err)

gr := &GetResponse{entry: e}
timestamp := gr.Timestamp()
require.Greater(t, timestamp, int64(0))
})

t.Run("Int", func(t *testing.T) {
var value = 100
err = dm.Put(ctx, "mykey-Int", value, nil)
Expand Down
15 changes: 5 additions & 10 deletions internal/cluster/routingtable/left_over_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/buraksezer/olric/internal/testutil"
"github.com/buraksezer/olric/internal/testutil/mockfragment"
"github.com/stretchr/testify/require"
)

func TestRoutingTable_LeftOverData(t *testing.T) {
Expand All @@ -29,9 +30,7 @@ func TestRoutingTable_LeftOverData(t *testing.T) {

c1 := testutil.NewConfig()
rt1, err := cluster.addNode(c1)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
require.NoError(t, err)

if !rt1.IsBootstrapped() {
t.Fatalf("The coordinator node cannot be bootstrapped")
Expand All @@ -46,19 +45,15 @@ func TestRoutingTable_LeftOverData(t *testing.T) {

c2 := testutil.NewConfig()
rt2, err := cluster.addNode(c2)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
require.NoError(t, err)

err = testutil.TryWithInterval(10, 100*time.Millisecond, func() error {
if !rt2.IsBootstrapped() {
return errors.New("the second node cannot be bootstrapped")
}
return nil
})
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
require.NoError(t, err)

for partID := uint64(0); partID < c2.PartitionCount; partID++ {
part := rt2.primary.PartitionByID(partID)
Expand All @@ -72,7 +67,7 @@ func TestRoutingTable_LeftOverData(t *testing.T) {
for partID := uint64(0); partID < c1.PartitionCount; partID++ {
part := rt1.primary.PartitionByID(partID)
if len(part.Owners()) != 2 {
t.Fatalf("Expected partition owners count: 2. Got: %d", part.OwnerCount())
t.Fatalf("Expected partition owners count: 2. Got: %d, PartID: %d", part.OwnerCount(), partID)
}
}

Expand Down
6 changes: 3 additions & 3 deletions internal/dmap/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ package dmap
import (
"context"
"encoding/json"
"github.com/buraksezer/olric/events"
"github.com/buraksezer/olric/internal/protocol"
"github.com/tidwall/redcon"
"strconv"
"testing"
"time"

"github.com/buraksezer/olric/events"
"github.com/buraksezer/olric/internal/cluster/partitions"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/internal/testcluster"
"github.com/buraksezer/olric/internal/testutil"
"github.com/stretchr/testify/require"
"github.com/tidwall/redcon"
)

func TestDMap_Balance_Invalid_PartID(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions internal/kvstore/table/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ func TestTable_UpdateTTL_Update_LastAccess(t *testing.T) {
lastAccessOne, err := tb.GetLastAccess(hkey)
require.NoError(t, err)

<-time.After(time.Millisecond)

ttl := time.Now().UnixNano() + 1000
e.SetTTL(ttl)

Expand Down
22 changes: 12 additions & 10 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ import (
"github.com/stretchr/testify/require"
)

func resetPubSubStats() {
pubsub.SubscribersTotal.Reset()
pubsub.CurrentPSubscribers.Reset()
pubsub.CurrentSubscribers.Reset()
pubsub.PSubscribersTotal.Reset()
pubsub.PublishedTotal.Reset()
}

func TestOlric_Stats(t *testing.T) {
cluster := newTestOlricCluster(t)
db := cluster.addMember(t)
Expand Down Expand Up @@ -108,6 +116,8 @@ func TestOlric_Stats_Cluster(t *testing.T) {
}

func TestStats_PubSub(t *testing.T) {
resetPubSubStats()

cluster := newTestOlricCluster(t)
db := cluster.addMember(t)

Expand All @@ -116,11 +126,7 @@ func TestStats_PubSub(t *testing.T) {

t.Run("Subscribe", func(t *testing.T) {
defer func() {
pubsub.SubscribersTotal.Reset()
pubsub.CurrentPSubscribers.Reset()
pubsub.CurrentSubscribers.Reset()
pubsub.PSubscribersTotal.Reset()
pubsub.PublishedTotal.Reset()
resetPubSubStats()
}()

var subscribers []*redis.PubSub
Expand Down Expand Up @@ -159,11 +165,7 @@ func TestStats_PubSub(t *testing.T) {

t.Run("PSubscribe", func(t *testing.T) {
defer func() {
pubsub.SubscribersTotal.Reset()
pubsub.CurrentPSubscribers.Reset()
pubsub.CurrentSubscribers.Reset()
pubsub.PSubscribersTotal.Reset()
pubsub.PublishedTotal.Reset()
resetPubSubStats()
}()

ps := rc.PSubscribe(ctx, "h?llo")
Expand Down

0 comments on commit 477780d

Please sign in to comment.