Skip to content

Commit

Permalink
fix #286. Make numbers of open shards configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldix authored and jvshahid committed Mar 3, 2014
1 parent 10fc4f9 commit a09abd4
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 6 deletions.
4 changes: 4 additions & 0 deletions config.toml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ max-open-files = 40
# and gigabytes, respectively.
lru-cache-size = "200m"

# The default setting on this is 0, which means unlimited. Set this to something if you want to
# limit the max number of open files. max-open-files is per shard so this * that will be max.
max-open-shards = 0

# These options specify how data is sharded across the cluster. There are two
# shard configurations that have the same knobs: short term and long term.
# Any series that begins with a capital letter like Exceptions will be written
Expand Down
4 changes: 4 additions & 0 deletions src/configuration/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ query-shard-buffer-size = 1000
# the process
# max-open-files = 40
lru-cache-size = "200m"
# The default setting on this is 0, which means unlimited. Set this to
# something if you want to limit the max number of open
# files. max-open-files is per shard so this * that will be max.
# max-open-shards = 0

# These options specify how data is sharded across the cluster. There are two
# shard configurations that have the same knobs: short term and long term.
Expand Down
7 changes: 5 additions & 2 deletions src/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ type LoggingConfig struct {
}

type LevelDbConfiguration struct {
MaxOpenFiles int `toml:"max-open-files"`
LruCacheSize size `toml:"lru-cache-size"`
MaxOpenFiles int `toml:"max-open-files"`
LruCacheSize size `toml:"lru-cache-size"`
MaxOpenShards int `toml:"max-open-shards"`
}

type ShardingDefinition struct {
Expand Down Expand Up @@ -185,6 +186,7 @@ type Configuration struct {
BindAddress string
LevelDbMaxOpenFiles int
LevelDbLruCacheSize int
LevelDbMaxOpenShards int
ShortTermShard *ShardConfiguration
LongTermShard *ShardConfiguration
ReplicationFactor int
Expand Down Expand Up @@ -258,6 +260,7 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
BindAddress: tomlConfiguration.BindAddress,
LevelDbMaxOpenFiles: tomlConfiguration.LevelDb.MaxOpenFiles,
LevelDbLruCacheSize: tomlConfiguration.LevelDb.LruCacheSize.int,
LevelDbMaxOpenShards: tomlConfiguration.LevelDb.MaxOpenShards,
LongTermShard: &tomlConfiguration.Sharding.LongTerm,
ShortTermShard: &tomlConfiguration.Sharding.ShortTerm,
ReplicationFactor: tomlConfiguration.Sharding.ReplicationFactor,
Expand Down
35 changes: 31 additions & 4 deletions src/datastore/leveldb_shard_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@ import (
"configuration"
"fmt"
"github.com/jmhodges/levigo"
"math"
"os"
"path/filepath"
"protocol"
"sync"
"time"
)

type LevelDbShardDatastore struct {
baseDbDir string
config *configuration.Configuration
shards map[uint32]*LevelDbShard
lastAccess map[uint32]int64
shardsLock sync.RWMutex
levelDbOptions *levigo.Options
writeBuffer *cluster.WriteBuffer
maxOpenShards int
}

const (
Expand Down Expand Up @@ -81,6 +85,8 @@ func NewLevelDbShardDatastore(config *configuration.Configuration) (*LevelDbShar
config: config,
shards: make(map[uint32]*LevelDbShard),
levelDbOptions: opts,
maxOpenShards: config.LevelDbMaxOpenShards,
lastAccess: make(map[uint32]int64),
}, nil
}

Expand All @@ -93,16 +99,19 @@ func (self *LevelDbShardDatastore) Close() {
}

func (self *LevelDbShardDatastore) GetOrCreateShard(id uint32) (cluster.LocalShardDb, error) {
self.shardsLock.RLock()
now := time.Now().Unix()
self.shardsLock.Lock()
defer self.shardsLock.Unlock()
db := self.shards[id]
self.shardsLock.RUnlock()
self.lastAccess[id] = now

if db != nil {
return db, nil
}

self.shardsLock.Lock()
defer self.shardsLock.Unlock()
if self.maxOpenShards > 0 && len(self.shards) > self.maxOpenShards {
self.closeOldestShard()
}

// check to make sure it hasn't been put there between the RUnlock and the Lock
db = self.shards[id]
Expand Down Expand Up @@ -146,6 +155,7 @@ func (self *LevelDbShardDatastore) DeleteShard(shardId uint32) error {
self.shardsLock.Lock()
shardDb := self.shards[shardId]
delete(self.shards, shardId)
delete(self.lastAccess, shardId)
self.shardsLock.Unlock()

if shardDb != nil {
Expand All @@ -161,6 +171,23 @@ func (self *LevelDbShardDatastore) shardDir(id uint32) string {
return filepath.Join(self.baseDbDir, fmt.Sprintf("%.5d", id))
}

func (self *LevelDbShardDatastore) closeOldestShard() {
var oldestId uint32
oldestAccess := int64(math.MaxInt64)
for id, lastAccess := range self.lastAccess {
if lastAccess < oldestAccess {
oldestId = id
oldestAccess = lastAccess
}
}
shard := self.shards[oldestId]
if shard != nil {
shard.close()
}
delete(self.shards, oldestId)
delete(self.lastAccess, oldestId)
}

// // returns true if the point has the correct field id and is
// // in the given time range
func isPointInRange(fieldId, startTime, endTime, point []byte) bool {
Expand Down

0 comments on commit a09abd4

Please sign in to comment.