Skip to content

Commit

Permalink
Memberlist now forwards only changes, not full original received mess…
Browse files Browse the repository at this point in the history
…age (cortexproject#4419)

* Memberlist now forwards only changes, not full original received message.

Signed-off-by: Peter Štibraný <[email protected]>

* CHANGELOG.md entry

Signed-off-by: Peter Štibraný <[email protected]>

* Ignore linter here.

Signed-off-by: Peter Štibraný <[email protected]>
Signed-off-by: Alvin Lin <[email protected]>
  • Loading branch information
pstibrany authored and alvinlin123 committed Jan 14, 2022
1 parent 6a778ea commit 06b0e97
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Users only have control of the HTTP header when Cortex is not frontend by an auth proxy validating the tenant IDs
* [CHANGE] Some files and directories created by Mimir components on local disk now have stricter permissions, and are only readable by owner, but not group or others. #4394
* [CHANGE] Compactor: compactor will no longer try to compact blocks that are already marked for deletion. Previously compactor would consider blocks marked for deletion within `-compactor.deletion-delay / 2` period as eligible for compaction. #4328
* [CHANGE] Memberlist: forward only changes, not entire original message. #4419
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341
* [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342
Expand Down
16 changes: 2 additions & 14 deletions pkg/ring/kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,20 +980,8 @@ func (m *KV) NotifyMsg(msg []byte) {
} else if version > 0 {
m.notifyWatchers(kvPair.Key)

m.addSentMessage(message{
Time: time.Now(),
Size: len(msg),
Pair: kvPair,
Version: version,
Changes: changes,
})

// Forward this message
// Memberlist will modify message once this function returns, so we need to make a copy
msgCopy := append([]byte(nil), msg...)

// forward this message further
m.queueBroadcast(kvPair.Key, mod.MergeContent(), version, msgCopy)
// Don't resend original message, but only changes.
m.broadcastNewValue(kvPair.Key, mod, version, codec)
}
}

Expand Down
94 changes: 94 additions & 0 deletions pkg/ring/kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,3 +1074,97 @@ func TestMessageBuffer(t *testing.T) {
assert.Len(t, buf, 2)
assert.Equal(t, size, 75)
}

func TestNotifyMsgResendsOnlyChanges(t *testing.T) {
codec := dataCodec{}

cfg := KVConfig{}
// We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor.
cfg.RetransmitMult = 1
cfg.Codecs = append(cfg.Codecs, codec)

kv := NewKV(cfg, log.NewNopLogger())
require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv))
defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck

client, err := NewClient(kv, codec)
require.NoError(t, err)

// No broadcast messages from KV at the beginning.
require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32)))

now := time.Now()

require.NoError(t, client.CAS(context.Background(), key, func(in interface{}) (out interface{}, retry bool, err error) {
d := getOrCreateData(in)
d.Members["a"] = member{Timestamp: now.Unix(), State: JOINING}
d.Members["b"] = member{Timestamp: now.Unix(), State: JOINING}
return d, true, nil
}))

// Check that new instance is broadcasted about just once.
assert.Equal(t, 1, len(kv.GetBroadcasts(0, math.MaxInt32)))
require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32)))

kv.NotifyMsg(marshalKeyValuePair(t, key, codec, &data{
Members: map[string]member{
"a": {Timestamp: now.Unix() - 5, State: ACTIVE},
"b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}},
"c": {Timestamp: now.Unix(), State: ACTIVE},
}}))

// Check two things here:
// 1) state of value in KV store
// 2) broadcast message only has changed members

d := getData(t, client, key)
assert.Equal(t, &data{
Members: map[string]member{
"a": {Timestamp: now.Unix(), State: JOINING, Tokens: []uint32{}}, // unchanged, timestamp too old
"b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}},
"c": {Timestamp: now.Unix(), State: ACTIVE, Tokens: []uint32{}},
}}, d)

bs := kv.GetBroadcasts(0, math.MaxInt32)
require.Equal(t, 1, len(bs))

d = decodeDataFromMarshalledKeyValuePair(t, bs[0], key, codec)
assert.Equal(t, &data{
Members: map[string]member{
// "a" is not here, because it wasn't changed by the message.
"b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}},
"c": {Timestamp: now.Unix(), State: ACTIVE},
}}, d)
}

func decodeDataFromMarshalledKeyValuePair(t *testing.T, marshalledKVP []byte, key string, codec dataCodec) *data {
kvp := KeyValuePair{}
require.NoError(t, kvp.Unmarshal(marshalledKVP))
require.Equal(t, key, kvp.Key)

val, err := codec.Decode(kvp.Value)
require.NoError(t, err)
d, ok := val.(*data)
require.True(t, ok)
return d
}

func marshalKeyValuePair(t *testing.T, key string, codec codec.Codec, value interface{}) []byte {
data, err := codec.Encode(value)
require.NoError(t, err)

kvp := KeyValuePair{Key: key, Codec: codec.CodecID(), Value: data}
data, err = kvp.Marshal()
require.NoError(t, err)
return data
}

func getOrCreateData(in interface{}) *data {
// Modify value that was passed as a parameter.
// Client takes care of concurrent modifications.
r, ok := in.(*data)
if !ok || r == nil {
return &data{Members: map[string]member{}}
}
return r
}

0 comments on commit 06b0e97

Please sign in to comment.