Skip to content

Commit

Permalink
fix:nacos-client query service list not found and can't clean invalid…
Browse files Browse the repository at this point in the history
… client beat info (#1326)
  • Loading branch information
chuntaojun authored Feb 3, 2024
1 parent ea1f4f7 commit fcfec62
Show file tree
Hide file tree
Showing 31 changed files with 1,008 additions and 1,025 deletions.
1 change: 0 additions & 1 deletion apiserver/eurekaserver/eureka_suit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
_ "github.com/polarismesh/polaris/plugin/healthchecker/redis"
_ "github.com/polarismesh/polaris/plugin/history/logger"
_ "github.com/polarismesh/polaris/plugin/password"
_ "github.com/polarismesh/polaris/plugin/ratelimit/lrurate"
_ "github.com/polarismesh/polaris/plugin/ratelimit/token"
_ "github.com/polarismesh/polaris/plugin/statis/logger"
_ "github.com/polarismesh/polaris/plugin/statis/prometheus"
Expand Down
1 change: 1 addition & 0 deletions apiserver/nacosserver/v1/discover/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (n *DiscoverServer) GetClientServer() (*restful.WebService, error) {
ws.Path("/nacos/v1/ns").Consumes(restful.MIME_JSON, model.MIME).Produces(restful.MIME_JSON)
n.addInstanceAccess(ws)
n.addSystemAccess(ws)
n.AddServiceAccess(ws)
return ws, nil
}

Expand Down
2 changes: 2 additions & 0 deletions apiserver/xdsserverv3/cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
rawbuffer "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/raw_buffer/v3"
tlstrans "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -155,6 +156,7 @@ func (cds *CDSBuilder) makeCluster(svcInfo *resource.ServiceInfo,
EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{
ServiceName: name,
EdsConfig: &core.ConfigSource{
ResourceApiVersion: resourcev3.DefaultAPIVersion,
ConfigSourceSpecifier: &core.ConfigSource_Ads{
Ads: &core.AggregatedConfigSource{},
},
Expand Down
102 changes: 68 additions & 34 deletions apiserver/xdsserverv3/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ import (

// EDSBuilder .
type EDSBuilder struct {
svr service.DiscoverServer
}

func (eds *EDSBuilder) Init(svr service.DiscoverServer) {
eds.svr = svr
}

func (eds *EDSBuilder) Generate(option *resource.BuildOption) (interface{}, error) {
Expand Down Expand Up @@ -74,48 +72,84 @@ func (eds *EDSBuilder) makeBoundEndpoints(option *resource.BuildOption,
continue
}

var lbEndpoints []*endpoint.LbEndpoint
var lbEndpoints []*endpoint.LocalityLbEndpoints
if !option.ForceDelete {
for _, instance := range serviceInfo.Instances {
// 处于隔离状态或者权重为0的实例不进行下发
if !resource.IsNormalEndpoint(instance) {
continue
}
ep := &endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: core.SocketAddress_TCP,
Address: instance.Host.Value,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: instance.Port.Value,
},
},
lbEndpoints = eds.buildServiceEndpoint(serviceInfo)
}

cla := &endpoint.ClusterLoadAssignment{
ClusterName: resource.MakeServiceName(svcKey, direction, option),
Endpoints: lbEndpoints,
}
clusterLoads = append(clusterLoads, cla)
}
return clusterLoads
}

func (eds *EDSBuilder) buildServiceEndpoint(serviceInfo *resource.ServiceInfo) []*endpoint.LocalityLbEndpoints {
locality := map[string]map[string]map[string][]*endpoint.LbEndpoint{}
for _, instance := range serviceInfo.Instances {
// 处于隔离状态或者权重为0的实例不进行下发
if !resource.IsNormalEndpoint(instance) {
continue
}
region := instance.GetLocation().GetRegion().GetValue()
zone := instance.GetLocation().GetZone().GetValue()
campus := instance.GetLocation().GetCampus().GetValue()
if _, ok := locality[region]; !ok {
locality[region] = map[string]map[string][]*endpoint.LbEndpoint{}
}
if _, ok := locality[region][zone]; !ok {
locality[region][zone] = map[string][]*endpoint.LbEndpoint{}
}
if _, ok := locality[region][zone][campus]; !ok {
locality[region][zone][campus] = make([]*endpoint.LbEndpoint, 0, 32)
}
ep := &endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: core.SocketAddress_TCP,
Address: instance.Host.Value,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: instance.Port.Value,
},
},
},
},
HealthStatus: resource.FormatEndpointHealth(instance),
LoadBalancingWeight: utils.NewUInt32Value(instance.GetWeight().GetValue()),
Metadata: resource.GenEndpointMetaFromPolarisIns(instance),
}
lbEndpoints = append(lbEndpoints, ep)
}
HealthCheckConfig: &endpoint.Endpoint_HealthCheckConfig{
DisableActiveHealthCheck: true,
},
},
},
HealthStatus: resource.FormatEndpointHealth(instance),
LoadBalancingWeight: utils.NewUInt32Value(instance.GetWeight().GetValue()),
Metadata: resource.GenEndpointMetaFromPolarisIns(instance),
}
locality[region][zone][campus] = append(locality[region][zone][campus], ep)
}

cla := &endpoint.ClusterLoadAssignment{
ClusterName: resource.MakeServiceName(svcKey, direction, option),
Endpoints: []*endpoint.LocalityLbEndpoints{
{
retVal := make([]*endpoint.LocalityLbEndpoints, 0, len(serviceInfo.Instances))

for region := range locality {
for zone := range locality[region] {
for campus := range locality[region][zone] {
lbEndpoints := locality[region][zone][campus]
localityLbEndpoints := &endpoint.LocalityLbEndpoints{
Locality: &core.Locality{
Region: region,
Zone: zone,
SubZone: campus,
},
LbEndpoints: lbEndpoints,
},
},
}
retVal = append(retVal, localityLbEndpoints)
}
}
clusterLoads = append(clusterLoads, cla)
}
return clusterLoads
return retVal
}

func (eds *EDSBuilder) makeSelfEndpoint(option *resource.BuildOption) []types.Resource {
Expand Down
20 changes: 20 additions & 0 deletions apiserver/xdsserverv3/resource/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,26 @@ type ServiceInfo struct {
FaultDetectRevision string
}

func (s *ServiceInfo) Equal(o *ServiceInfo) bool {
// 通过 revision 判断
if s.SvcInsRevision != o.SvcInsRevision {
return false
}
if s.SvcRoutingRevision != o.SvcRoutingRevision {
return false
}
if s.SvcRateLimitRevision != o.SvcRateLimitRevision {
return false
}
if s.CircuitBreakerRevision != o.CircuitBreakerRevision {
return false
}
if s.FaultDetectRevision != o.FaultDetectRevision {
return false
}
return true
}

func (s *ServiceInfo) MatchService(ns, name string) bool {
if s.Namespace == ns && s.Name == name {
return true
Expand Down
64 changes: 20 additions & 44 deletions apiserver/xdsserverv3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,16 @@ func (x *XDSServer) startSynTask(ctx context.Context) {
continue
}

cacheServiceInfos := curRegistryInfo[ns]
// 命名空间存在,但是命名空间下的服务有删除情况,需要找出来
for _, info := range infos {
cacheServiceInfos := curRegistryInfo[ns]
if _, ok := cacheServiceInfos[info.ServiceKey]; !ok {
if _, ok := needRemove[ns]; !ok {
needRemove[ns] = make(map[model.ServiceKey]*resource.ServiceInfo)
}
needRemove[ns][info.ServiceKey] = info
if _, ok := cacheServiceInfos[info.ServiceKey]; ok {
continue
}
if _, ok := needRemove[ns]; !ok {
needRemove[ns] = make(map[model.ServiceKey]*resource.ServiceInfo)
}
needRemove[ns][info.ServiceKey] = info
}
}

Expand All @@ -281,10 +281,20 @@ func (x *XDSServer) startSynTask(ctx context.Context) {
needPush[ns] = infos
continue
}

// 判断当前这个空间,是否需要更新配置
if x.checkUpdate(infos, cacheServiceInfos) {
needPush[ns] = infos
for _, info := range infos {
oldSvc, exist := cacheServiceInfos[info.ServiceKey]
// 如果原来的 cache 不存在,直接就是需要推送
showPush := !exist
if exist {
// 如果原来的 cache 存在,这需要在比对下数据是否出现变化
showPush = !info.Equal(oldSvc)
}
if showPush {
if _, ok := needPush[ns]; !ok {
needPush[ns] = make(map[model.ServiceKey]*resource.ServiceInfo)
}
needPush[ns][info.ServiceKey] = info
}
}
}

Expand Down Expand Up @@ -438,40 +448,6 @@ func (x *XDSServer) Generate(needPush, needRemove map[string]map[model.ServiceKe
x.resourceGenerator.Generate(versionLocal, needPush, needRemove)
}

func (x *XDSServer) checkUpdate(curServiceInfo, cacheServiceInfo map[model.ServiceKey]*resource.ServiceInfo) bool {
if len(curServiceInfo) != len(cacheServiceInfo) {
return true
}
for _, info := range curServiceInfo {
find := false
for _, serviceInfo := range cacheServiceInfo {
if info.Name == serviceInfo.Name {
// 通过 revision 判断
if info.SvcInsRevision != serviceInfo.SvcInsRevision {
return true
}
if info.SvcRoutingRevision != serviceInfo.SvcRoutingRevision {
return true
}
if info.SvcRateLimitRevision != serviceInfo.SvcRateLimitRevision {
return true
}
if info.CircuitBreakerRevision != serviceInfo.CircuitBreakerRevision {
return true
}
if info.FaultDetectRevision != serviceInfo.FaultDetectRevision {
return true
}
find = true
}
}
if !find {
return true
}
}
return false
}

func (x *XDSServer) DebugHandlers() []model.DebugHandler {
return []model.DebugHandler{
{
Expand Down
1 change: 0 additions & 1 deletion auth/defaultauth/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
_ "github.com/polarismesh/polaris/plugin/healthchecker/redis"
_ "github.com/polarismesh/polaris/plugin/history/logger"
_ "github.com/polarismesh/polaris/plugin/password"
_ "github.com/polarismesh/polaris/plugin/ratelimit/lrurate"
_ "github.com/polarismesh/polaris/plugin/ratelimit/token"
_ "github.com/polarismesh/polaris/plugin/statis/logger"
_ "github.com/polarismesh/polaris/plugin/statis/prometheus"
Expand Down
4 changes: 4 additions & 0 deletions cache/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,10 @@ func (bc *BaseCache) Close() error {
return nil
}

func (bc *BaseCache) RefreshInterval() time.Duration {
return time.Second
}

type (
// GrayCache 灰度 Cache 接口
GrayCache interface {
Expand Down
37 changes: 22 additions & 15 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (nc *CacheManager) OpenResourceCache(entries ...types.ConfigEntry) error {
return nil
}

// update 缓存更新
func (nc *CacheManager) update() error {
// warmUp 缓存更新
func (nc *CacheManager) warmUp() error {
var wg sync.WaitGroup
entries := nc.needLoad.ToSlice()
for i := range entries {
Expand Down Expand Up @@ -124,25 +124,32 @@ func (nc *CacheManager) Start(ctx context.Context) error {

// 启动的时候,先更新一版缓存
log.Infof("[Cache] cache update now first time")
if err := nc.update(); err != nil {
if err := nc.warmUp(); err != nil {
return err
}
log.Infof("[Cache] cache update done")

// 启动协程,开始定时更新缓存数据
go func() {
ticker := time.NewTicker(nc.GetUpdateCacheInterval())
defer ticker.Stop()

for {
select {
case <-ticker.C:
_ = nc.update()
case <-ctx.Done():
return
}
entries := nc.needLoad.ToSlice()
for i := range entries {
name := entries[i]
index, exist := cacheSet[name]
if !exist {
return fmt.Errorf("cache resource %s not exists", name)
}
}()
go func(c types.Cache) {
ticker := time.NewTicker(nc.GetUpdateCacheInterval())
for {
select {
case <-ticker.C:
_ = c.Update()
case <-ctx.Done():
ticker.Stop()
return
}
}
}(nc.caches[index])
}

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions cache/config/config_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ func (fc *fileCache) Initialize(opt map[string]interface{}) error {
fc.metricsReleaseCount = utils.NewSyncMap[string, *utils.SyncMap[string, uint64]]()
fc.preMetricsFiles = utils.NewAtomicValue[map[string]map[string]struct{}](map[string]map[string]struct{}{})
fc.lastReportTime = utils.NewAtomicValue[time.Time](time.Time{})
valueCache, err := openBoltCache(opt)
valueCache, err := fc.openBoltCache(opt)
if err != nil {
return err
}
fc.valueCache = valueCache
return nil
}

func openBoltCache(opt map[string]interface{}) (*bbolt.DB, error) {
func (fc *fileCache) openBoltCache(opt map[string]interface{}) (*bbolt.DB, error) {
path, _ := opt["cachePath"].(string)
if path == "" {
path = "./data/cache/config"
Expand Down
2 changes: 1 addition & 1 deletion cache/test_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ var (

// TestUpdate only for test
func (nc *CacheManager) TestUpdate() error {
return nc.update()
return nc.warmUp()
}
Loading

0 comments on commit fcfec62

Please sign in to comment.