-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
pkg/receive: basic hashring support (#1217)
* go.mod: add xxhash for hashing * pkg/receive: add simple hashring
- Loading branch information
Showing
3 changed files
with
364 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
package receive | ||
|
||
import ( | ||
"errors" | ||
"sort" | ||
|
||
"github.com/improbable-eng/thanos/pkg/store/prompb" | ||
|
||
"github.com/cespare/xxhash" | ||
"github.com/prometheus/common/model" | ||
"github.com/prometheus/prometheus/discovery/targetgroup" | ||
) | ||
|
||
const sep = '\xff' | ||
|
||
// Hashring finds the correct host to handle a given time series | ||
// for a specified tenant. | ||
// It returns the hostname and any error encountered. | ||
type Hashring interface { | ||
GetHost(tenant string, timeSeries *prompb.TimeSeries) (string, error) | ||
} | ||
|
||
// Matcher determines whether or tenant matches a hashring. | ||
type Matcher interface { | ||
Match(tenant string, hashring string) bool | ||
} | ||
|
||
// MultiMatcher is a list of Matchers that implements the Matcher interface. | ||
type MultiMatcher []Matcher | ||
|
||
// Match implements the Matcher interface. | ||
func (m MultiMatcher) Match(tenant, hashring string) bool { | ||
for i := range m { | ||
if !m[i].Match(tenant, hashring) { | ||
return false | ||
} | ||
} | ||
return true | ||
} | ||
|
||
// MatcherFunc is a shim to use a func as a Matcher. | ||
type MatcherFunc func(string, string) bool | ||
|
||
// Match implements the Matcher interface. | ||
func (m MatcherFunc) Match(tenant, hashring string) bool { | ||
return m(tenant, hashring) | ||
} | ||
|
||
// ExactMatcher is a matcher that checks if the tenant exactly matches the hashring name. | ||
var ExactMatcher = MatcherFunc(func(tenant, hashring string) bool { return tenant == hashring }) | ||
|
||
// hash returns a hash for the given tenant and time series. | ||
func hash(tenant string, ts *prompb.TimeSeries) uint64 { | ||
// Sort labelset to ensure a stable hash. | ||
sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name }) | ||
|
||
b := make([]byte, 0, 1024) | ||
b = append(b, []byte(tenant)...) | ||
b = append(b, sep) | ||
for _, v := range ts.Labels { | ||
b = append(b, v.Name...) | ||
b = append(b, sep) | ||
b = append(b, v.Value...) | ||
b = append(b, sep) | ||
} | ||
return xxhash.Sum64(b) | ||
} | ||
|
||
// simpleHashring represents a group of hosts handling write requests. | ||
type simpleHashring struct { | ||
targetgroup.Group | ||
} | ||
|
||
// GetHost returns a hostname to handle the given tenant and time series. | ||
func (s *simpleHashring) GetHost(tenant string, ts *prompb.TimeSeries) (string, error) { | ||
// Always return nil here to implement the Hashring interface. | ||
return string(s.Targets[hash(tenant, ts)%uint64(len(s.Targets))][model.AddressLabel]), nil | ||
} | ||
|
||
// matchingHashring represents a set of hashrings. | ||
// Which hashring to use is determined by the matcher. | ||
type matchingHashring struct { | ||
cache map[string]Hashring | ||
hashrings map[string]Hashring | ||
matcher Matcher | ||
} | ||
|
||
// GetHost returns a hostname to handle the given tenant and time series. | ||
func (m matchingHashring) GetHost(tenant string, ts *prompb.TimeSeries) (string, error) { | ||
if h, ok := m.cache[tenant]; ok { | ||
return h.GetHost(tenant, ts) | ||
} | ||
for name := range m.hashrings { | ||
if m.matcher.Match(tenant, name) { | ||
m.cache[tenant] = m.hashrings[name] | ||
return m.hashrings[name].GetHost(tenant, ts) | ||
} | ||
} | ||
return "", errors.New("no matching hosts to handle tenant") | ||
} | ||
|
||
// NewHashring creates a multi-tenant hashring for a given slice of | ||
// groups. Which tenant's hashring to use is determined by the Matcher. | ||
func NewHashring(matcher Matcher, groups []*targetgroup.Group) Hashring { | ||
m := matchingHashring{ | ||
cache: make(map[string]Hashring), | ||
hashrings: make(map[string]Hashring), | ||
matcher: matcher, | ||
} | ||
for _, g := range groups { | ||
m.hashrings[g.Source] = &simpleHashring{*g} | ||
} | ||
return m | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,249 @@ | ||
package receive | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/improbable-eng/thanos/pkg/store/prompb" | ||
"github.com/prometheus/common/model" | ||
"github.com/prometheus/prometheus/discovery/targetgroup" | ||
) | ||
|
||
func TestHash(t *testing.T) { | ||
ts := &prompb.TimeSeries{ | ||
Labels: []prompb.Label{ | ||
{ | ||
Name: "foo", | ||
Value: "bar", | ||
}, | ||
{ | ||
Name: "baz", | ||
Value: "qux", | ||
}, | ||
}, | ||
} | ||
|
||
ts2 := &prompb.TimeSeries{ | ||
Labels: []prompb.Label{ts.Labels[1], ts.Labels[0]}, | ||
} | ||
|
||
if hash("", ts) != hash("", ts2) { | ||
t.Errorf("expected hashes to be independent of label order") | ||
} | ||
} | ||
|
||
func TestGetHost(t *testing.T) { | ||
ts := &prompb.TimeSeries{ | ||
Labels: []prompb.Label{ | ||
{ | ||
Name: "foo", | ||
Value: "bar", | ||
}, | ||
{ | ||
Name: "baz", | ||
Value: "qux", | ||
}, | ||
}, | ||
} | ||
|
||
for _, tc := range []struct { | ||
name string | ||
cfg []*targetgroup.Group | ||
hosts map[string]struct{} | ||
tenant string | ||
}{ | ||
{ | ||
name: "empty", | ||
cfg: []*targetgroup.Group{}, | ||
tenant: "tenant1", | ||
}, | ||
{ | ||
name: "simple", | ||
cfg: []*targetgroup.Group{ | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host1", | ||
}, | ||
}, | ||
}, | ||
}, | ||
hosts: map[string]struct{}{"host1": struct{}{}}, | ||
}, | ||
{ | ||
name: "specific", | ||
cfg: []*targetgroup.Group{ | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host1", | ||
}, | ||
}, | ||
Source: "", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host2", | ||
}, | ||
}, | ||
Source: "tenant1", | ||
}, | ||
}, | ||
hosts: map[string]struct{}{"host2": struct{}{}}, | ||
tenant: "tenant1", | ||
}, | ||
{ | ||
name: "many tenants", | ||
cfg: []*targetgroup.Group{ | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host1", | ||
}, | ||
}, | ||
Source: "tenant1", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host2", | ||
}, | ||
}, | ||
Source: "tenant2", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host3", | ||
}, | ||
}, | ||
Source: "tenant3", | ||
}, | ||
}, | ||
hosts: map[string]struct{}{"host1": struct{}{}}, | ||
tenant: "tenant1", | ||
}, | ||
{ | ||
name: "many tenants error", | ||
cfg: []*targetgroup.Group{ | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host1", | ||
}, | ||
}, | ||
Source: "tenant1", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host2", | ||
}, | ||
}, | ||
Source: "tenant2", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host3", | ||
}, | ||
}, | ||
Source: "tenant3", | ||
}, | ||
}, | ||
tenant: "tenant4", | ||
}, | ||
{ | ||
name: "many hosts", | ||
cfg: []*targetgroup.Group{ | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host1", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host2", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host3", | ||
}, | ||
}, | ||
Source: "tenant1", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host4", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host5", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host6", | ||
}, | ||
}, | ||
Source: "", | ||
}, | ||
}, | ||
hosts: map[string]struct{}{ | ||
"host1": struct{}{}, | ||
"host2": struct{}{}, | ||
"host3": struct{}{}, | ||
}, | ||
tenant: "tenant1", | ||
}, | ||
{ | ||
name: "many hosts 2", | ||
cfg: []*targetgroup.Group{ | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host1", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host2", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host3", | ||
}, | ||
}, | ||
Source: "tenant1", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host4", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host5", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host6", | ||
}, | ||
}, | ||
}, | ||
}, | ||
hosts: map[string]struct{}{ | ||
"host4": struct{}{}, | ||
"host5": struct{}{}, | ||
"host6": struct{}{}, | ||
}, | ||
}, | ||
} { | ||
hs := NewHashring(ExactMatcher, tc.cfg) | ||
h, err := hs.GetHost(tc.tenant, ts) | ||
if tc.hosts != nil { | ||
if err != nil { | ||
t.Errorf("case %q: got unexpected error: %v", tc.name, err) | ||
continue | ||
} | ||
if _, ok := tc.hosts[h]; !ok { | ||
t.Errorf("case %q: got unexpected host %q", tc.name, h) | ||
} | ||
continue | ||
} | ||
if err == nil { | ||
t.Errorf("case %q: expected error", tc.name) | ||
} | ||
} | ||
} |