Skip to content

Commit

Permalink
Merge branch 'm3db:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
gandhikrishna authored Jun 23, 2023
2 parents ce93af8 + 7d7c940 commit eb07a85
Show file tree
Hide file tree
Showing 3 changed files with 407 additions and 59 deletions.
91 changes: 62 additions & 29 deletions src/dbnode/environment/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"os"
"sort"
"time"

clusterclient "github.com/m3db/m3/src/cluster/client"
Expand Down Expand Up @@ -366,32 +367,24 @@ func (c Configuration) configureStatic(cfgParams ConfigurationParameters) (Confi

nsInitStatic := namespace.NewStaticInitializer(nsList)

shardSet, hostShardSets, err := newStaticShardSet(cluster.TopologyConfig.Shards, cluster.TopologyConfig.Hosts)
numReplicas := cluster.TopologyConfig.Replicas
if numReplicas == 0 {
numReplicas = 1
}

shardSet, hostShardSets, err := newStaticShardSet(
cluster.TopologyConfig.Shards,
numReplicas,
cluster.TopologyConfig.Hosts,
)
if err != nil {
err = fmt.Errorf("unable to create shard set for static config: %v", err)
return emptyConfig, err
}
staticOptions := topology.NewStaticOptions().
SetHostShardSets(hostShardSets).
SetShardSet(shardSet)

numHosts := len(cluster.TopologyConfig.Hosts)
numReplicas := cluster.TopologyConfig.Replicas

switch numReplicas {
case 0:
if numHosts != 1 {
err := fmt.Errorf("number of hosts (%d) must be 1 if replicas is not set", numHosts)
return emptyConfig, err
}
staticOptions = staticOptions.SetReplicas(1)
default:
if numHosts != numReplicas {
err := fmt.Errorf("number of hosts (%d) not equal to number of replicas (%d)", numHosts, numReplicas)
return emptyConfig, err
}
staticOptions = staticOptions.SetReplicas(cluster.TopologyConfig.Replicas)
}
SetShardSet(shardSet).
SetReplicas(numReplicas)

topoInit := topology.NewStaticInitializer(staticOptions)
result := ConfigureResult{
Expand All @@ -407,29 +400,69 @@ func (c Configuration) configureStatic(cfgParams ConfigurationParameters) (Confi
return cfgResults, nil
}

func newStaticShardSet(numShards int, hosts []topology.HostShardConfig) (sharding.ShardSet, []topology.HostShardSet, error) {
func newStaticShardSet(
numShards int,
rf int,
hosts []topology.HostShardConfig,
) (sharding.ShardSet, []topology.HostShardSet, error) {
var (
shardSet sharding.ShardSet
hostShardSets []topology.HostShardSet
shardIDs []uint32
err error
shardSet sharding.ShardSet
shardIDs []uint32
err error
)

for i := uint32(0); i < uint32(numShards); i++ {
shardIDs = append(shardIDs, i)
}

shards := sharding.NewShards(shardIDs, shard.Available)
shardSet, err = sharding.NewShardSet(shards, sharding.DefaultHashFn(len(shards)))
shardSet, err = sharding.NewShardSet(shards, sharding.DefaultHashFn(numShards))
if err != nil {
return nil, nil, err
}

for _, i := range hosts {
host := topology.NewHost(i.HostID, i.ListenAddress)
hostShardSets, err := generatePlacement(hosts, numShards, rf)
if err != nil {
return nil, nil, err
}

return shardSet, hostShardSets, nil
}

func generatePlacement(hosts []topology.HostShardConfig, numShards int, rf int) ([]topology.HostShardSet, error) {
numHosts := len(hosts)
if numHosts == 0 || numShards < 1 || rf < 1 {
return nil, errors.New("number of hosts, shards, and RF must be positive")
}
if rf > numHosts {
return nil, errors.New("number of hosts must be >=RF")
}

hostShards := make([][]shard.Shard, numHosts)
hostIdx := 0
// Round robin assign shard replicas to hosts.
for shardInt := uint32(0); shardInt < uint32(numShards); shardInt++ {
for replica := 0; replica < rf; replica++ {
newShard := shard.NewShard(shardInt).SetState(shard.Available)
hostShards[hostIdx] = append(hostShards[hostIdx], newShard)
hostIdx = (hostIdx + 1) % numHosts
}
}

hostShardSets := make([]topology.HostShardSet, 0, numHosts)
sortedHosts := make([]topology.HostShardConfig, numHosts)
// Plain copy is okay because struct just contains strings.
copy(sortedHosts, hosts)
sort.Slice(sortedHosts, func(i, j int) bool { return sortedHosts[i].HostID < sortedHosts[j].HostID })
for i, host := range sortedHosts {
host := topology.NewHost(host.HostID, host.ListenAddress)
shardSet, err := sharding.NewShardSet(hostShards[i], sharding.DefaultHashFn(numShards))
if err != nil {
return nil, fmt.Errorf("error constructing new ShardSet: %w", err)
}
hostShardSet := topology.NewHostShardSet(host, shardSet)
hostShardSets = append(hostShardSets, hostShardSet)
}

return shardSet, hostShardSets, nil
return hostShardSets, nil
}
Loading

0 comments on commit eb07a85

Please sign in to comment.