Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
louyuting committed Jul 20, 2020
1 parent 14500c5 commit 76fa7e0
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 58 deletions.
85 changes: 29 additions & 56 deletions ext/datasource/nacos/nacos.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/nacos-group/nacos-sdk-go/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/clients/nacos_client"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/common/http_agent"
"github.com/nacos-group/nacos-sdk-go/vo"
"github.com/pkg/errors"
)
Expand All @@ -16,37 +13,20 @@ var (
logger = logging.GetDefaultLogger()
)

type ConfigParam struct {
DataId string
Group string
}

var clientConfig = constant.ClientConfig{
TimeoutMs: 10 * 1000,
ListenInterval: 30 * 1000,
BeatInterval: 5 * 1000,
LogDir: "/nacos/logs",
CacheDir: "/nacos/cache",
}

type NacosDataSource struct {
datasource.Base
serverConfig constant.ServerConfig
clientConfig constant.ClientConfig
client *config_client.ConfigClient
isInitialized util.AtomicBool
configParam ConfigParam
listener *vo.ConfigParam
getConfig vo.ConfigParam
closeChan chan struct{}
}

func NewNacosDataSource(clientConfig constant.ClientConfig, serverConfig constant.ServerConfig, configParam ConfigParam, handlers ...datasource.PropertyHandler) (*NacosDataSource, error) {
if len(serverConfig.IpAddr) <= 0 || serverConfig.Port <= 0 {
return nil, errors.New("The nacos serverConfig is incorrect.")
}
func NewNacosDataSource(client *config_client.ConfigClient, getConfig vo.ConfigParam, handlers ...datasource.PropertyHandler) (*NacosDataSource, error) {
var ds = &NacosDataSource{
clientConfig: clientConfig,
serverConfig: serverConfig,
configParam: configParam,
Base: datasource.Base{},
client: client,
getConfig: getConfig,
closeChan: make(chan struct{}, 1),
}
for _, h := range handlers {
ds.AddPropertyHandler(h)
Expand All @@ -58,58 +38,51 @@ func (s *NacosDataSource) Initialize() error {
if !s.isInitialized.CompareAndSet(false, true) {
return nil
}
nc, err := buildNacosClient(s)
client, err := config_client.NewConfigClient(nc)
data, err := s.ReadSource()
if err != nil {
return errors.Errorf("Nacosclient failed to build, err: %+v", err)
return err
}
s.client = &client
err = s.listen(s.client)
return err
if err = s.doUpdate(data); err != nil {
return err
}
return s.listen(s.client)
}

func buildNacosClient(s *NacosDataSource) (nacos_client.INacosClient, error) {
nc := nacos_client.NacosClient{}
nc.SetServerConfig([]constant.ServerConfig{s.serverConfig})
err := nc.SetClientConfig(s.clientConfig)
err = nc.SetHttpAgent(&http_agent.HttpAgent{})
return &nc, err
}
func (s *NacosDataSource) ReadSource() ([]byte, error) {
content, err := s.client.GetConfig(vo.ConfigParam{
DataId: s.configParam.DataId,
Group: s.configParam.Group,
})
content, err := s.client.GetConfig(s.getConfig)
if err != nil {
return nil, errors.Errorf("Failed to read the nacos data source, err: %+v", err)
}
return []byte(content), err
}

func (s *NacosDataSource) doUpdate(data []byte) error {
return s.Handle(data)
}

func (s *NacosDataSource) listen(client *config_client.ConfigClient) (err error) {
s.listener = &vo.ConfigParam{
DataId: s.configParam.DataId,
Group: s.configParam.Group,
listener := vo.ConfigParam{
DataId: s.getConfig.DataId,
Group: s.getConfig.Group,
OnChange: func(namespace, group, dataId, data string) {
if err := s.Handle([]byte(data)); err != nil {
logger.Errorf("Fail to update data for dataId:[%s] group:[%s] namespaceId:[%s] when execute "+
"listen function, err: %+v", data, group, namespace, err)
err := s.doUpdate([]byte(data))
if err != nil {
logger.Errorf("")
}

},
ListenCloseChan: make(chan struct{}, 1),
ListenCloseChan: s.closeChan,
}
err = client.ListenConfig(*s.listener)
err = client.ListenConfig(listener)
if err != nil {
return errors.Errorf("Failed to listen to the nacos data source, err: %+v", err)
}
return
}

func (s *NacosDataSource) Close() error {
s.listener.ListenCloseChan <- struct{}{}
s.closeChan <- struct{}{}

logger.Infof("The RefreshableFileDataSource had been closed. DataId:[%s],Group:[%s]",
s.configParam.DataId, s.configParam.Group)
logger.Infof("The nacos datasource had been closed. DataId:[%s],Group:[%s]",
s.getConfig.DataId, s.getConfig.Group)
return nil
}
4 changes: 2 additions & 2 deletions ext/datasource/nacos/nacos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ const (

var serverConfig = constant.ServerConfig{
ContextPath: "/nacos",
Port: 80,
IpAddr: "console.nacos.io",
Port: 8848,
IpAddr: "127.0.0.1",
}
var serverConfigErr = constant.ServerConfig{
ContextPath: "/nacos",
Expand Down

0 comments on commit 76fa7e0

Please sign in to comment.