Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: check new Read target when initializing boltdb-shipper store #4681

Merged
merged 6 commits into from
Nov 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,9 @@ fmt-jsonnet:
xargs -n 1 -- jsonnetfmt -i

lint-scripts:
# Ignore https://github.com/koalaman/shellcheck/wiki/SC2312
@find . -name '*.sh' -not -path '*/vendor/*' -print0 | \
xargs -0 -n1 shellcheck -x -o all
xargs -0 -n1 shellcheck -e SC2312 -x -o all


# search for dead link in our documentation.
Expand Down
14 changes: 14 additions & 0 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {

applyFIFOCacheConfig(r)
applyIngesterFinalSleep(r)
applyChunkRetain(r, &defaults)

return nil
}
Expand Down Expand Up @@ -464,3 +465,16 @@ func isMemcacheSet(cfg cortexcache.Config) bool {
func applyIngesterFinalSleep(cfg *ConfigWrapper) {
cfg.Ingester.LifecyclerConfig.FinalSleep = 0 * time.Second
}

// applyChunkRetain is used to set chunk retain based on having an index query cache configured
// We retain chunks for at least as long as the index queries cache TTL. When an index entry is
// cached, any chunks flushed after that won't be in the cached entry. To make sure their data is
// available the RetainPeriod keeps them available in the ingesters live data. We want to retain them
// for at least as long as the TTL on the index queries cache.
func applyChunkRetain(cfg, defaults *ConfigWrapper) {
if !reflect.DeepEqual(cfg.StorageConfig.IndexQueriesCacheConfig, defaults.StorageConfig.IndexQueriesCacheConfig) {
// Set the retain period to the cache validity plus one minute. One minute is arbitrary but leaves some
// buffer to make sure the chunks are there until the index entries expire.
cfg.Ingester.RetainPeriod = cfg.StorageConfig.IndexCacheValidity + 1*time.Minute
}
}
28 changes: 28 additions & 0 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,3 +1303,31 @@ common:
assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store)
})
}

func Test_applyChunkRetain(t *testing.T) {
t.Run("chunk retain is unchanged if no index queries cache is defined", func(t *testing.T) {
yamlContent := ``
config, defaults, err := configWrapperFromYAML(t, yamlContent, nil)
assert.NoError(t, err)
assert.Equal(t, defaults.Ingester.RetainPeriod, config.Ingester.RetainPeriod)
})

t.Run("chunk retain is set to IndexCacheValidity + 1 minute", func(t *testing.T) {
yamlContent := `
storage_config:
index_cache_validity: 10m
index_queries_cache_config:
memcached:
batch_size: 256
parallelism: 10
memcached_client:
consistent_hash: true
host: memcached-index-queries.loki-bigtable.svc.cluster.local
service: memcached-client
`
config, _, err := configWrapperFromYAML(t, yamlContent, nil)
assert.NoError(t, err)
assert.Equal(t, 11*time.Minute, config.Ingester.RetainPeriod)
})

}
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (t *Loki) setupModuleManager() error {
}

// Add IngesterQuerier as a dependency for store when target is either ingester or querier.
if t.Cfg.isModuleEnabled(Querier) || t.Cfg.isModuleEnabled(Ruler) {
if t.Cfg.isModuleEnabled(Querier) || t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(Read) {
deps[Store] = append(deps[Store], IngesterQuerier)
}

Expand Down
21 changes: 13 additions & 8 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,23 +337,28 @@ func (t *Loki) initStore() (_ services.Service, err error) {
if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID
switch true {
case t.Cfg.isModuleEnabled(Ingester):
case t.Cfg.isModuleEnabled(Ingester), t.Cfg.isModuleEnabled(Write):
// We do not want ingester to unnecessarily keep downloading files
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
// Use fifo cache for caching index in memory.
// Use fifo cache for caching index in memory, this also significantly helps performance.
t.Cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{
EnableFifoCache: true,
Fifocache: cache.FifoCacheConfig{
MaxSizeBytes: "200 MB",
// We snapshot the index in ingesters every minute for reads so reduce the index cache validity by a minute.
// This is usually set in StorageConfig.IndexCacheValidity but since this is exclusively used for caching the index entries,
// I(Sandeep) am setting it here which also helps reduce some CPU cycles and allocations required for
// unmarshalling the cached data to check the expiry.
// This is a small hack to save some CPU cycles.
// We check if the object is still valid after pulling it from cache using the IndexCacheValidity value
// however it has to be deserialized to do so, setting the cache validity to some arbitrary amount less than the
// IndexCacheValidity guarantees the FIFO cache will expire the object first which can be done without
// having to deserialize the object.
Validity: t.Cfg.StorageConfig.IndexCacheValidity - 1*time.Minute,
},
}
// Force the retain period to be longer than the IndexCacheValidity used in the store, this guarantees we don't
// have query gaps on chunks flushed after an index entry is cached by keeping them retained in the ingester
// and queried as part of live data until the cache TTL expires on the index entry.
t.Cfg.Ingester.RetainPeriod = t.Cfg.StorageConfig.IndexCacheValidity + 1*time.Minute
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler):
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read):
// We do not want query to do any updates to index
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
default:
Expand All @@ -370,7 +375,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)
switch true {
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler):
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read):
// Do not use the AsyncStore if the querier is configured with QueryStoreOnly set to true
if t.Cfg.Querier.QueryStoreOnly {
break
Expand Down
8 changes: 4 additions & 4 deletions tools/increment_version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,28 @@ a=( "${version//./ }" )

# If version string is missing or has the wrong number of members, show usage message.

if [ ${#a[@]} -ne 3 ]
if [[ ${#a[@]} -ne 3 ]]
then
echo "usage: $(basename "$0") [-Mmp] major.minor.patch"
exit 1
fi

# Increment version numbers as requested.

if [ -n "${major}" ]
if [[ -n "${major}" ]]
then
((a[0]++))
a[1]=0
a[2]=0
fi

if [ -n "${minor}" ]
if [[ -n "${minor}" ]]
then
((a[1]++))
a[2]=0
fi

if [ -n "${patch}" ]
if [[ -n "${patch}" ]]
then
((a[2]++))
fi
Expand Down