Skip to content

Commit

Permalink
fix: wrong new dmap fragment creation criteria
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Jul 10, 2021
1 parent 3538706 commit b28d783
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 89 deletions.
1 change: 1 addition & 0 deletions cmd/olricd/olricd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ olricd:
readRepair: false
replicationMode: 0 # sync mode. for async, set 1
memberCountQuorum: 1
routingTablePushInterval: 10s

storageEngines:
config:
Expand Down
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ const (
// DefaultStorageEngine denotes the storage engine implementation provided by
// Olric project.
DefaultStorageEngine = "kvstore"

DefaultRoutingTablePushInterval = time.Minute
)

// Config is the configuration to create a Olric instance.
Expand Down Expand Up @@ -166,6 +168,8 @@ type Config struct {
// bootstrapping status without blocking indefinitely.
BootstrapTimeout time.Duration

RoutingTablePushInterval time.Duration

// The list of host:port which are used by memberlist for discovery.
// Don't confuse it with Name.
Peers []string
Expand Down Expand Up @@ -381,6 +385,9 @@ func (c *Config) Sanitize() error {
if c.MaxJoinAttempts == 0 {
c.MaxJoinAttempts = DefaultMaxJoinAttempts
}
if c.RoutingTablePushInterval == 0*time.Second {
c.RoutingTablePushInterval = DefaultRoutingTablePushInterval
}

if c.Client == nil {
c.Client = NewClient()
Expand Down
31 changes: 16 additions & 15 deletions config/internal/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@ package loader
import "gopkg.in/yaml.v2"

type olricd struct {
Name string `yaml:"name"`
BindAddr string `yaml:"bindAddr"`
BindPort int `yaml:"bindPort"`
Interface string `yaml:"interface"`
ReplicationMode int `yaml:"replicationMode"`
PartitionCount uint64 `yaml:"partitionCount"`
LoadFactor float64 `yaml:"loadFactor"`
Serializer string `yaml:"serializer"`
KeepAlivePeriod string `yaml:"keepAlivePeriod"`
BootstrapTimeout string `yaml:"bootstrapTimeout"`
ReplicaCount int `yaml:"replicaCount"`
WriteQuorum int `yaml:"writeQuorum"`
ReadQuorum int `yaml:"readQuorum"`
ReadRepair bool `yaml:"readRepair"`
MemberCountQuorum int32 `yaml:"memberCountQuorum"`
Name string `yaml:"name"`
BindAddr string `yaml:"bindAddr"`
BindPort int `yaml:"bindPort"`
Interface string `yaml:"interface"`
ReplicationMode int `yaml:"replicationMode"`
PartitionCount uint64 `yaml:"partitionCount"`
LoadFactor float64 `yaml:"loadFactor"`
Serializer string `yaml:"serializer"`
KeepAlivePeriod string `yaml:"keepAlivePeriod"`
BootstrapTimeout string `yaml:"bootstrapTimeout"`
ReplicaCount int `yaml:"replicaCount"`
WriteQuorum int `yaml:"writeQuorum"`
ReadQuorum int `yaml:"readQuorum"`
ReadRepair bool `yaml:"readRepair"`
MemberCountQuorum int32 `yaml:"memberCountQuorum"`
RoutingTablePushInterval string `yaml:"routingTablePushInterval"`
}

type client struct {
Expand Down
66 changes: 37 additions & 29 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func Load(filename string) (*Config, error) {
return nil, err
}

var joinRetryInterval, keepAlivePeriod, bootstrapTimeout time.Duration
var joinRetryInterval, keepAlivePeriod, bootstrapTimeout, routingTablePushInterval time.Duration
if c.Olricd.KeepAlivePeriod != "" {
keepAlivePeriod, err = time.ParseDuration(c.Olricd.KeepAlivePeriod)
if err != nil {
Expand All @@ -314,6 +314,13 @@ func Load(filename string) (*Config, error) {
c.Memberlist.JoinRetryInterval))
}
}
if c.Olricd.RoutingTablePushInterval != "" {
routingTablePushInterval, err = time.ParseDuration(c.Olricd.RoutingTablePushInterval)
if err != nil {
return nil, errors.WithMessage(err,
fmt.Sprintf("failed to parse olricd.routingTablePushInterval: '%s'", c.Olricd.RoutingTablePushInterval))
}
}

clientConfig := Client{}
err = mapYamlToConfig(&clientConfig, &c.Client)
Expand All @@ -331,34 +338,35 @@ func Load(filename string) (*Config, error) {
storageEngines.Config = c.StorageEngines.Config

cfg := &Config{
BindAddr: c.Olricd.BindAddr,
BindPort: c.Olricd.BindPort,
Interface: c.Olricd.Interface,
ServiceDiscovery: c.ServiceDiscovery,
MemberlistInterface: c.Memberlist.Interface,
MemberlistConfig: memberlistConfig,
Client: &clientConfig,
LogLevel: c.Logging.Level,
JoinRetryInterval: joinRetryInterval,
MaxJoinAttempts: c.Memberlist.MaxJoinAttempts,
Peers: c.Memberlist.Peers,
PartitionCount: c.Olricd.PartitionCount,
ReplicaCount: c.Olricd.ReplicaCount,
WriteQuorum: c.Olricd.WriteQuorum,
ReadQuorum: c.Olricd.ReadQuorum,
ReplicationMode: c.Olricd.ReplicationMode,
ReadRepair: c.Olricd.ReadRepair,
LoadFactor: c.Olricd.LoadFactor,
MemberCountQuorum: c.Olricd.MemberCountQuorum,
Logger: log.New(logOutput, "", log.LstdFlags),
LogOutput: logOutput,
LogVerbosity: c.Logging.Verbosity,
Hasher: hasher.NewDefaultHasher(),
Serializer: sr,
KeepAlivePeriod: keepAlivePeriod,
BootstrapTimeout: bootstrapTimeout,
DMaps: dmapConfig,
StorageEngines: storageEngines,
BindAddr: c.Olricd.BindAddr,
BindPort: c.Olricd.BindPort,
Interface: c.Olricd.Interface,
ServiceDiscovery: c.ServiceDiscovery,
MemberlistInterface: c.Memberlist.Interface,
MemberlistConfig: memberlistConfig,
Client: &clientConfig,
LogLevel: c.Logging.Level,
JoinRetryInterval: joinRetryInterval,
RoutingTablePushInterval: routingTablePushInterval,
MaxJoinAttempts: c.Memberlist.MaxJoinAttempts,
Peers: c.Memberlist.Peers,
PartitionCount: c.Olricd.PartitionCount,
ReplicaCount: c.Olricd.ReplicaCount,
WriteQuorum: c.Olricd.WriteQuorum,
ReadQuorum: c.Olricd.ReadQuorum,
ReplicationMode: c.Olricd.ReplicationMode,
ReadRepair: c.Olricd.ReadRepair,
LoadFactor: c.Olricd.LoadFactor,
MemberCountQuorum: c.Olricd.MemberCountQuorum,
Logger: log.New(logOutput, "", log.LstdFlags),
LogOutput: logOutput,
LogVerbosity: c.Logging.Verbosity,
Hasher: hasher.NewDefaultHasher(),
Serializer: sr,
KeepAlivePeriod: keepAlivePeriod,
BootstrapTimeout: bootstrapTimeout,
DMaps: dmapConfig,
StorageEngines: storageEngines,
}

if err := cfg.Sanitize(); err != nil {
Expand Down
30 changes: 15 additions & 15 deletions internal/cluster/routingtable/routingtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type RoutingTable struct {
discovery *discovery.Discovery
callbacks []func()
callbackMtx sync.Mutex
updatePeriod time.Duration
pushPeriod time.Duration
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
Expand All @@ -88,17 +88,17 @@ func New(e *environment.Environment) *RoutingTable {
Load: c.LoadFactor,
}
return &RoutingTable{
members: newMembers(),
discovery: discovery.New(log, c),
config: c,
log: log,
consistent: consistent.New(nil, cc),
primary: e.Get("primary").(*partitions.Partitions),
backup: e.Get("backup").(*partitions.Partitions),
client: e.Get("client").(*transport.Client),
updatePeriod: time.Minute,
ctx: ctx,
cancel: cancel,
members: newMembers(),
discovery: discovery.New(log, c),
config: c,
log: log,
consistent: consistent.New(nil, cc),
primary: e.Get("primary").(*partitions.Partitions),
backup: e.Get("backup").(*partitions.Partitions),
client: e.Get("client").(*transport.Client),
pushPeriod: c.RoutingTablePushInterval,
ctx: ctx,
cancel: cancel,
}
}

Expand Down Expand Up @@ -294,10 +294,10 @@ func (r *RoutingTable) listenClusterEvents(eventCh chan *discovery.ClusterEvent)
}
}

func (r *RoutingTable) updatePeriodically() {
func (r *RoutingTable) pushPeriodically() {
defer r.wg.Done()

ticker := time.NewTicker(r.updatePeriod)
ticker := time.NewTicker(r.pushPeriod)
defer ticker.Stop()
for {
select {
Expand Down Expand Up @@ -386,7 +386,7 @@ func (r *RoutingTable) Start() error {
}

r.wg.Add(1)
go r.updatePeriodically()
go r.pushPeriodically()

if r.config.MemberlistInterface != "" {
r.log.V(2).Printf("[INFO] Memberlist uses interface: %s", r.config.MemberlistInterface)
Expand Down
9 changes: 1 addition & 8 deletions internal/dmap/delete_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,14 @@
package dmap

import (
"errors"

"github.com/buraksezer/olric/internal/cluster/partitions"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/pkg/neterrors"
)

func (s *Service) deleteOperationCommon(w, r protocol.EncodeDecoder, f func(dm *DMap, r protocol.EncodeDecoder) error) {
req := r.(*protocol.DMapMessage)
dm, err := s.getDMap(req.DMap())
if errors.Is(err, ErrDMapNotFound) {
// we don't even have the DMap. It's just OK.
w.SetStatus(protocol.StatusOK)
return
}
dm, err := s.getOrCreateDMap(req.DMap())
if err != nil {
neterrors.ErrorResponse(w, err)
return
Expand Down
4 changes: 2 additions & 2 deletions internal/dmap/destroy_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

func (s *Service) destroyOperation(w, r protocol.EncodeDecoder) {
req := r.(*protocol.DMapMessage)
dm, err := s.getDMap(req.DMap())
dm, err := s.getOrCreateDMap(req.DMap())
if err != nil {
neterrors.ErrorResponse(w, err)
return
Expand Down Expand Up @@ -55,7 +55,7 @@ func (s *Service) destroyDMapOperation(w, r protocol.EncodeDecoder) {
// This is very similar with rm -rf. Destroys given dmap on the cluster
for partID := uint64(0); partID < s.config.PartitionCount; partID++ {
dm, err := s.getDMap(req.DMap())
if err == ErrDMapNotFound {
if errors.Is(err, ErrDMapNotFound) {
continue
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/dmap/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (s *Service) scanFragmentForEviction(partID uint64, name string, f *fragmen
var maxTotalCount = 100
var totalCount = 0

dm, err := s.getDMap(name)
dm, err := s.getOrCreateDMap(name)
if err != nil {
s.log.V(3).Printf("[ERROR] Failed to load DMap: %s: %v", name, err)
return
Expand Down
9 changes: 1 addition & 8 deletions internal/dmap/expire_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,14 @@
package dmap

import (
"errors"

"github.com/buraksezer/olric/internal/cluster/partitions"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/pkg/neterrors"
)

func (s *Service) expireOperationCommon(w, r protocol.EncodeDecoder, f func(dm *DMap, r protocol.EncodeDecoder) error) {
req := r.(*protocol.DMapMessage)
dm, err := s.getDMap(req.DMap())
if errors.Is(err, ErrDMapNotFound) {
GetMisses.Increase(1)
neterrors.ErrorResponse(w, ErrKeyNotFound)
return
}
dm, err := s.getOrCreateDMap(req.DMap())
if err != nil {
neterrors.ErrorResponse(w, err)
return
Expand Down
4 changes: 2 additions & 2 deletions internal/dmap/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ func (dm *DMap) lookupOnReplicas(hkey uint64, key string) []*version {
v := &version{host: &host}
resp, err := dm.s.requestTo(replica.String(), req)
if err != nil {
if dm.s.log.V(3).Ok() {
dm.s.log.V(3).Printf("[ERROR] Failed to call get on"+
if dm.s.log.V(6).Ok() {
dm.s.log.V(6).Printf("[ERROR] Failed to call get on"+
" a replica owner: %s: %v", replica, err)
}
} else {
Expand Down
9 changes: 1 addition & 8 deletions internal/dmap/get_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package dmap

import (
"errors"

"github.com/buraksezer/olric/internal/cluster/partitions"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/pkg/neterrors"
Expand All @@ -25,12 +23,7 @@ import (

func (s *Service) getOperationCommon(w, r protocol.EncodeDecoder, f func(dm *DMap, r protocol.EncodeDecoder) (storage.Entry, error)) {
req := r.(*protocol.DMapMessage)
dm, err := s.getDMap(req.DMap())
if errors.Is(err, ErrDMapNotFound) {
GetMisses.Increase(1)
neterrors.ErrorResponse(w, ErrKeyNotFound)
return
}
dm, err := s.getOrCreateDMap(req.DMap())
if err != nil {
neterrors.ErrorResponse(w, err)
return
Expand Down
3 changes: 2 additions & 1 deletion internal/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

/*Package kvstore implements a GC friendly in-memory storage engine by using map and byte array. It also supports compaction.*/
/*Package kvstore implements a GC friendly in-memory storage engine by using
built-in maps and byte slices. It also supports compaction.*/
package kvstore

import (
Expand Down

0 comments on commit b28d783

Please sign in to comment.