Skip to content

Commit

Permalink
等待 apiserver 都启动完成,再自注册。 (#785)
Browse files Browse the repository at this point in the history
* 等待 apiserver 都启动完成,再自注册。+ 格式化代码

等待 apiserver 都启动完成,再自注册。

1.完善代码规范
2.将cond.add 放到 apiserver的listen后。

完善代码规范

* import_format 格式化

* 添加Licensed

* 改用waitgroup

Co-authored-by: zhanglei25 <[email protected]>
  • Loading branch information
reallovelei and reallovelei authored Nov 9, 2022
1 parent db611ac commit 9679e45
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 22 deletions.
15 changes: 6 additions & 9 deletions apiserver/eurekaserver/eureka_suit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,6 @@ import (
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/namespace"
"github.com/polarismesh/polaris/plugin"
"github.com/polarismesh/polaris/service"
"github.com/polarismesh/polaris/service/batch"
"github.com/polarismesh/polaris/service/healthcheck"
"github.com/polarismesh/polaris/store"
storemock "github.com/polarismesh/polaris/store/mock"

"github.com/polarismesh/polaris/testdata"

// 注册相关默认插件
_ "github.com/polarismesh/polaris/plugin/cmdb/memory"
_ "github.com/polarismesh/polaris/plugin/discoverevent/local"
_ "github.com/polarismesh/polaris/plugin/discoverstat/discoverlocal"
Expand All @@ -52,6 +43,12 @@ import (
_ "github.com/polarismesh/polaris/plugin/ratelimit/lrurate"
_ "github.com/polarismesh/polaris/plugin/ratelimit/token"
_ "github.com/polarismesh/polaris/plugin/statis/local"
"github.com/polarismesh/polaris/service"
"github.com/polarismesh/polaris/service/batch"
"github.com/polarismesh/polaris/service/healthcheck"
"github.com/polarismesh/polaris/store"
storemock "github.com/polarismesh/polaris/store/mock"
"github.com/polarismesh/polaris/testdata"
)

type Bootstrap struct {
Expand Down
5 changes: 4 additions & 1 deletion apiserver/eurekaserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (
"strings"
"time"

"github.com/emicklei/go-restful/v3"
restful "github.com/emicklei/go-restful/v3"
"go.uber.org/zap"

"github.com/polarismesh/polaris/apiserver"
"github.com/polarismesh/polaris/bootstrap"
"github.com/polarismesh/polaris/common/connlimit"
"github.com/polarismesh/polaris/common/secure"
"github.com/polarismesh/polaris/common/utils"
Expand Down Expand Up @@ -253,6 +254,8 @@ func (h *EurekaServer) Run(errCh chan error) {
errCh <- err
return
}
bootstrap.ApiServerWaitGroup.Done()

ln = &tcpKeepAliveListener{ln.(*net.TCPListener)}
// 开启最大连接数限制
if h.connLimitConfig != nil && h.connLimitConfig.OpenConnLimit {
Expand Down
2 changes: 2 additions & 0 deletions apiserver/grpcserver/config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/polarismesh/polaris/apiserver"
"github.com/polarismesh/polaris/apiserver/grpcserver"
"github.com/polarismesh/polaris/bootstrap"
api "github.com/polarismesh/polaris/common/api/v1"
commonlog "github.com/polarismesh/polaris/common/log"
"github.com/polarismesh/polaris/config"
Expand Down Expand Up @@ -65,6 +66,7 @@ func (g *ConfigGRPCServer) Initialize(ctx context.Context, option map[string]int
// Run 启动GRPC API服务器
func (g *ConfigGRPCServer) Run(errCh chan error) {
g.BaseGrpcServer.Run(errCh, g.GetProtocol(), func(server *grpc.Server) error {
defer bootstrap.ApiServerWaitGroup.Done()
for name, apiConfig := range g.openAPI {
switch name {
case "client":
Expand Down
2 changes: 2 additions & 0 deletions apiserver/grpcserver/discover/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/polarismesh/polaris/apiserver/grpcserver"
v1 "github.com/polarismesh/polaris/apiserver/grpcserver/discover/v1"
v2 "github.com/polarismesh/polaris/apiserver/grpcserver/discover/v2"
"github.com/polarismesh/polaris/bootstrap"
apiv1 "github.com/polarismesh/polaris/common/api/v1"
apiv2 "github.com/polarismesh/polaris/common/api/v2"
commonlog "github.com/polarismesh/polaris/common/log"
Expand Down Expand Up @@ -106,6 +107,7 @@ func (g *GRPCServer) Initialize(ctx context.Context, option map[string]interface
// Run 启动GRPC API服务器
func (g *GRPCServer) Run(errCh chan error) {
g.BaseGrpcServer.Run(errCh, g.GetProtocol(), func(server *grpc.Server) error {
defer bootstrap.ApiServerWaitGroup.Done()
for name, config := range g.openAPI {
switch name {
case "client":
Expand Down
6 changes: 4 additions & 2 deletions apiserver/httpserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"strings"
"time"

"github.com/emicklei/go-restful/v3"
restful "github.com/emicklei/go-restful/v3"
"github.com/pkg/errors"
"go.uber.org/zap"

Expand All @@ -35,6 +35,7 @@ import (
v1 "github.com/polarismesh/polaris/apiserver/httpserver/v1"
v2 "github.com/polarismesh/polaris/apiserver/httpserver/v2"
"github.com/polarismesh/polaris/auth"
"github.com/polarismesh/polaris/bootstrap"
api "github.com/polarismesh/polaris/common/api/v1"
"github.com/polarismesh/polaris/common/connlimit"
commonlog "github.com/polarismesh/polaris/common/log"
Expand Down Expand Up @@ -220,6 +221,7 @@ func (h *HTTPServer) Run(errCh chan error) {
errCh <- err
return
}
bootstrap.ApiServerWaitGroup.Done()

ln = &tcpKeepAliveListener{ln.(*net.TCPListener)}
// 开启最大连接数限制
Expand Down Expand Up @@ -318,7 +320,7 @@ func (h *HTTPServer) createRestfulContainer() (*restful.Container, error) {
Container: wsContainer}
wsContainer.Filter(cors.Filter)

// Add container filter to respond to OPTIONS
// Incr container filter to respond to OPTIONS
wsContainer.Filter(wsContainer.OPTIONSFilter)

wsContainer.Filter(h.process)
Expand Down
2 changes: 2 additions & 0 deletions apiserver/l5pbserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.uber.org/zap"

"github.com/polarismesh/polaris/apiserver"
"github.com/polarismesh/polaris/bootstrap"
"github.com/polarismesh/polaris/common/api/l5"
"github.com/polarismesh/polaris/plugin"
"github.com/polarismesh/polaris/service"
Expand Down Expand Up @@ -105,6 +106,7 @@ func (l *L5pbserver) Run(errCh chan error) {
errCh <- err
return
}
bootstrap.ApiServerWaitGroup.Done()
l.listener = listener

for {
Expand Down
6 changes: 4 additions & 2 deletions apiserver/prometheussd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (
"strings"
"time"

"github.com/emicklei/go-restful/v3"
restful "github.com/emicklei/go-restful/v3"
"go.uber.org/zap"

"github.com/polarismesh/polaris/apiserver"
"github.com/polarismesh/polaris/bootstrap"
"github.com/polarismesh/polaris/common/connlimit"
"github.com/polarismesh/polaris/common/log"
"github.com/polarismesh/polaris/common/utils"
Expand Down Expand Up @@ -115,6 +116,7 @@ func (h *PrometheusServer) Run(errCh chan error) {
errCh <- err
return
}
bootstrap.ApiServerWaitGroup.Done()

ln = &tcpKeepAliveListener{ln.(*net.TCPListener)}
// 开启最大连接数限制
Expand Down Expand Up @@ -207,7 +209,7 @@ func (h *PrometheusServer) createRestfulContainer() (*restful.Container, error)
Container: wsContainer}
wsContainer.Filter(cors.Filter)

// Add container filter to respond to OPTIONS
// Incr container filter to respond to OPTIONS
wsContainer.Filter(wsContainer.OPTIONSFilter)

wsContainer.Filter(h.process)
Expand Down
4 changes: 3 additions & 1 deletion apiserver/xdsserverv3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/golang/protobuf/ptypes"
_struct "github.com/golang/protobuf/ptypes/struct"
Expand All @@ -54,6 +54,7 @@ import (
"google.golang.org/protobuf/types/known/anypb"

"github.com/polarismesh/polaris/apiserver"
"github.com/polarismesh/polaris/bootstrap"
"github.com/polarismesh/polaris/cache"
api "github.com/polarismesh/polaris/common/api/v1"
"github.com/polarismesh/polaris/common/connlimit"
Expand Down Expand Up @@ -160,6 +161,7 @@ func (x *XDSServer) Run(errCh chan error) {
errCh <- err
return
}
bootstrap.ApiServerWaitGroup.Done()

if x.connLimitConfig != nil && x.connLimitConfig.OpenConnLimit {
log.Infof("grpc server use max connection limit: %d, grpc max limit: %d",
Expand Down
10 changes: 8 additions & 2 deletions bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"fmt"
"net"
"strings"
"sync"
"time"

"github.com/golang/protobuf/ptypes/wrappers"
"gopkg.in/yaml.v2"
yaml "gopkg.in/yaml.v2"

"github.com/polarismesh/polaris/apiserver"
"github.com/polarismesh/polaris/auth"
Expand All @@ -52,6 +53,7 @@ var (
SelfServiceInstance = make([]*api.Instance, 0)
ConfigFilePath = ""
selfHeathChecker *SelfHeathChecker
ApiServerWaitGroup = new(sync.WaitGroup)
)

// Start 启动
Expand Down Expand Up @@ -292,6 +294,9 @@ func StartServers(ctx context.Context, cfg *boot_config.Config, errCh chan error
[]apiserver.Apiserver, error) {
// 启动API服务器
var servers []apiserver.Apiserver

// 等待所有ApiServer都监听完成

for _, protocol := range cfg.APIServers {
slot, exist := apiserver.Slots[protocol.Name]
if !exist {
Expand All @@ -306,9 +311,10 @@ func StartServers(ctx context.Context, cfg *boot_config.Config, errCh chan error
}

servers = append(servers, slot)
ApiServerWaitGroup.Add(1)
go slot.Run(errCh)
}

ApiServerWaitGroup.Wait()
return servers, nil
}

Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package main

import (
"github.com/polarismesh/polaris/cmd"

_ "go.uber.org/automaxprocs"

"github.com/polarismesh/polaris/cmd"
)

func main() {
Expand Down
3 changes: 2 additions & 1 deletion plugin/whitelist/ip_whitelist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
"errors"
"testing"

"github.com/polarismesh/polaris/plugin"
"github.com/stretchr/testify/assert"

"github.com/polarismesh/polaris/plugin"
)

func Test_ipWhitelist_Name(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions store/sqldb/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"errors"
"fmt"

_ "github.com/go-sql-driver/mysql"

"github.com/polarismesh/polaris/plugin"
"github.com/polarismesh/polaris/store"

_ "github.com/go-sql-driver/mysql"
)

const (
Expand Down

0 comments on commit 9679e45

Please sign in to comment.