Skip to content

Commit

Permalink
lrs: use JSON for locality's String representation (#4135)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jan 11, 2021
1 parent f60ed8a commit 2d61c30
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 40 deletions.
33 changes: 24 additions & 9 deletions xds/internal/balancer/edsbalancer/eds_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,17 @@ func (edsImpl *edsBalancerImpl) handleChildPolicy(name string, config json.RawMe
continue
}
for lid, config := range bgwc.configs {
lidJSON, err := lid.ToString()
if err != nil {
edsImpl.logger.Errorf("failed to marshal LocalityID: %#v, skipping this locality", lid)
continue
}
// TODO: (eds) add support to balancer group to support smoothly
// switching sub-balancers (keep old balancer around until new
// balancer becomes ready).
bgwc.bg.Remove(lid.String())
bgwc.bg.Add(lid.String(), edsImpl.subBalancerBuilder)
bgwc.bg.UpdateClientConnState(lid.String(), balancer.ClientConnState{
bgwc.bg.Remove(lidJSON)
bgwc.bg.Add(lidJSON, edsImpl.subBalancerBuilder)
bgwc.bg.UpdateClientConnState(lidJSON, balancer.ClientConnState{
ResolverState: resolver.State{Addresses: config.addrs},
})
// This doesn't need to manually update picker, because the new
Expand Down Expand Up @@ -282,6 +287,11 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
// One balancer for each locality.

lid := locality.ID
lidJSON, err := lid.ToString()
if err != nil {
edsImpl.logger.Errorf("failed to marshal LocalityID: %#v, skipping this locality", lid)
continue
}
newLocalitiesSet[lid] = struct{}{}

newWeight := locality.Weight
Expand Down Expand Up @@ -316,8 +326,8 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
config, ok := bgwc.configs[lid]
if !ok {
// A new balancer, add it to balancer group and balancer map.
bgwc.stateAggregator.Add(lid.String(), newWeight)
bgwc.bg.Add(lid.String(), edsImpl.subBalancerBuilder)
bgwc.stateAggregator.Add(lidJSON, newWeight)
bgwc.bg.Add(lidJSON, edsImpl.subBalancerBuilder)
config = &localityConfig{
weight: newWeight,
}
Expand All @@ -340,23 +350,28 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup

if weightChanged {
config.weight = newWeight
bgwc.stateAggregator.UpdateWeight(lid.String(), newWeight)
bgwc.stateAggregator.UpdateWeight(lidJSON, newWeight)
rebuildStateAndPicker = true
}

if addrsChanged {
config.addrs = newAddrs
bgwc.bg.UpdateClientConnState(lid.String(), balancer.ClientConnState{
bgwc.bg.UpdateClientConnState(lidJSON, balancer.ClientConnState{
ResolverState: resolver.State{Addresses: newAddrs},
})
}
}

// Delete localities that are removed in the latest response.
for lid := range bgwc.configs {
lidJSON, err := lid.ToString()
if err != nil {
edsImpl.logger.Errorf("failed to marshal LocalityID: %#v, skipping this locality", lid)
continue
}
if _, ok := newLocalitiesSet[lid]; !ok {
bgwc.stateAggregator.Remove(lid.String())
bgwc.bg.Remove(lid.String())
bgwc.stateAggregator.Remove(lidJSON)
bgwc.bg.Remove(lidJSON)
delete(bgwc.configs, lid)
edsImpl.logger.Infof("Locality %v deleted", lid)
rebuildStateAndPicker = true
Expand Down
6 changes: 4 additions & 2 deletions xds/internal/balancer/edsbalancer/eds_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,12 +726,14 @@ func (s) TestEDS_LoadReport(t *testing.T) {
// We expect the 10 picks to be split between the localities since they are
// of equal weight. And since we only mark the picks routed to sc2 as done,
// the picks on sc1 should show up as inProgress.
locality1JSON, _ := locality1.ToString()
locality2JSON, _ := locality2.ToString()
wantStoreData := []*load.Data{{
Cluster: testClusterNames[0],
Service: "",
LocalityStats: map[string]load.LocalityData{
locality1.String(): {RequestStats: load.RequestData{InProgress: 5}},
locality2.String(): {RequestStats: load.RequestData{Succeeded: 5}},
locality1JSON: {RequestStats: load.RequestData{InProgress: 5}},
locality2JSON: {RequestStats: load.RequestData{Succeeded: 5}},
},
}}
for i := 0; i < 10; i++ {
Expand Down
22 changes: 13 additions & 9 deletions xds/internal/balancer/lrs/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/client/load"
)
Expand Down Expand Up @@ -93,7 +92,12 @@ func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if b.lb != nil {
b.lb.Close()
}
b.lb = bb.Build(newCCWrapper(b.cc, b.client.loadStore(), newConfig.Locality), b.buildOpts)
lidJSON, err := newConfig.Locality.ToString()
if err != nil {
return fmt.Errorf("failed to marshal LocalityID: %#v", newConfig.Locality)
}
ccWrapper := newCCWrapper(b.cc, b.client.loadStore(), lidJSON)
b.lb = bb.Build(ccWrapper, b.buildOpts)
}
b.config = newConfig

Expand Down Expand Up @@ -126,20 +130,20 @@ func (b *lrsBalancer) Close() {

type ccWrapper struct {
balancer.ClientConn
loadStore load.PerClusterReporter
localityID *internal.LocalityID
loadStore load.PerClusterReporter
localityIDJSON string
}

func newCCWrapper(cc balancer.ClientConn, loadStore load.PerClusterReporter, localityID *internal.LocalityID) *ccWrapper {
func newCCWrapper(cc balancer.ClientConn, loadStore load.PerClusterReporter, localityIDJSON string) *ccWrapper {
return &ccWrapper{
ClientConn: cc,
loadStore: loadStore,
localityID: localityID,
ClientConn: cc,
loadStore: loadStore,
localityIDJSON: localityIDJSON,
}
}

func (ccw *ccWrapper) UpdateState(s balancer.State) {
s.Picker = newLoadReportPicker(s.Picker, *ccw.localityID, ccw.loadStore)
s.Picker = newLoadReportPicker(s.Picker, ccw.localityIDJSON, ccw.loadStore)
ccw.ClientConn.UpdateState(s)
}

Expand Down
3 changes: 2 additions & 1 deletion xds/internal/balancer/lrs/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ func TestLoadReporting(t *testing.T) {
if sd.Cluster != testClusterName || sd.Service != testServiceName {
t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName)
}
localityData, ok := sd.LocalityStats[testLocality.String()]
testLocalityJSON, _ := testLocality.ToString()
localityData, ok := sd.LocalityStats[testLocalityJSON]
if !ok {
t.Fatalf("loads for %v not found in store", testLocality)
}
Expand Down
5 changes: 2 additions & 3 deletions xds/internal/balancer/lrs/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package lrs
import (
orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/xds/internal"
)

const (
Expand All @@ -43,10 +42,10 @@ type loadReportPicker struct {
loadStore loadReporter
}

func newLoadReportPicker(p balancer.Picker, id internal.LocalityID, loadStore loadReporter) *loadReportPicker {
func newLoadReportPicker(p balancer.Picker, id string, loadStore loadReporter) *loadReportPicker {
return &loadReportPicker{
p: p,
locality: id.String(),
locality: id,
loadStore: loadStore,
}
}
Expand Down
31 changes: 15 additions & 16 deletions xds/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package internal

import (
"encoding/json"
"fmt"
"strings"
)

type clientID string
Expand All @@ -41,23 +41,22 @@ type LocalityID struct {
SubZone string `json:"subZone,omitempty"`
}

// String generates a string representation of LocalityID by adding ":" between
// the components of the LocalityID.
func (l LocalityID) String() string {
return fmt.Sprintf("%s:%s:%s", l.Region, l.Zone, l.SubZone)
// ToString generates a string representation of LocalityID by marshalling it into
// json. Not calling it String() so printf won't call it.
func (l LocalityID) ToString() (string, error) {
b, err := json.Marshal(l)
if err != nil {
return "", err
}
return string(b), nil
}

// LocalityIDFromString converts a string representation of locality, of the
// form region:zone:sub-zone (as generated by the above String() method), into a
// LocalityIDFromString converts a json representation of locality, into a
// LocalityID struct.
func LocalityIDFromString(l string) (LocalityID, error) {
parts := strings.Split(l, ":")
if len(parts) != 3 {
return LocalityID{}, fmt.Errorf("%s is not a well formatted locality ID", l)
func LocalityIDFromString(s string) (ret LocalityID, _ error) {
err := json.Unmarshal([]byte(s), &ret)
if err != nil {
return LocalityID{}, fmt.Errorf("%s is not a well formatted locality ID, error: %v", s, err)
}
return LocalityID{
Region: parts[0],
Zone: parts[1],
SubZone: parts[2],
}, nil
return ret, nil
}
45 changes: 45 additions & 0 deletions xds/internal/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,48 @@ func (s) TestLocalityMatchProtoMessage(t *testing.T) {
t.Fatalf("internal type and proto message have different fields: (-got +want):\n%+v", diff)
}
}

func TestLocalityToAndFromJSON(t *testing.T) {
tests := []struct {
name string
localityID LocalityID
str string
wantErr bool
}{
{
name: "3 fields",
localityID: LocalityID{Region: "r:r", Zone: "z#z", SubZone: "s^s"},
str: `{"region":"r:r","zone":"z#z","subZone":"s^s"}`,
},
{
name: "2 fields",
localityID: LocalityID{Region: "r:r", Zone: "z#z"},
str: `{"region":"r:r","zone":"z#z"}`,
},
{
name: "1 field",
localityID: LocalityID{Region: "r:r"},
str: `{"region":"r:r"}`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotStr, err := tt.localityID.ToString()
if err != nil {
t.Errorf("failed to marshal LocalityID: %#v", tt.localityID)
}
if gotStr != tt.str {
t.Errorf("%#v.String() = %q, want %q", tt.localityID, gotStr, tt.str)
}

gotID, err := LocalityIDFromString(tt.str)
if (err != nil) != tt.wantErr {
t.Errorf("LocalityIDFromString(%q) error = %v, wantErr %v", tt.str, err, tt.wantErr)
return
}
if diff := cmp.Diff(gotID, tt.localityID); diff != "" {
t.Errorf("LocalityIDFromString() got = %v, want %v, diff: %s", gotID, tt.localityID, diff)
}
})
}
}

0 comments on commit 2d61c30

Please sign in to comment.