From 76fa7e0362e996a4353afbf8cb05b0f1c07af860 Mon Sep 17 00:00:00 2001 From: louyuting <1849491904@qq.com> Date: Mon, 20 Jul 2020 23:47:13 +0800 Subject: [PATCH] refactor --- ext/datasource/nacos/nacos.go | 85 ++++++++++-------------------- ext/datasource/nacos/nacos_test.go | 4 +- 2 files changed, 31 insertions(+), 58 deletions(-) diff --git a/ext/datasource/nacos/nacos.go b/ext/datasource/nacos/nacos.go index eac7a5dcb..ee48bde2d 100644 --- a/ext/datasource/nacos/nacos.go +++ b/ext/datasource/nacos/nacos.go @@ -5,9 +5,6 @@ import ( "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" "github.com/nacos-group/nacos-sdk-go/clients/config_client" - "github.com/nacos-group/nacos-sdk-go/clients/nacos_client" - "github.com/nacos-group/nacos-sdk-go/common/constant" - "github.com/nacos-group/nacos-sdk-go/common/http_agent" "github.com/nacos-group/nacos-sdk-go/vo" "github.com/pkg/errors" ) @@ -16,37 +13,20 @@ var ( logger = logging.GetDefaultLogger() ) -type ConfigParam struct { - DataId string - Group string -} - -var clientConfig = constant.ClientConfig{ - TimeoutMs: 10 * 1000, - ListenInterval: 30 * 1000, - BeatInterval: 5 * 1000, - LogDir: "/nacos/logs", - CacheDir: "/nacos/cache", -} - type NacosDataSource struct { datasource.Base - serverConfig constant.ServerConfig - clientConfig constant.ClientConfig client *config_client.ConfigClient isInitialized util.AtomicBool - configParam ConfigParam - listener *vo.ConfigParam + getConfig vo.ConfigParam + closeChan chan struct{} } -func NewNacosDataSource(clientConfig constant.ClientConfig, serverConfig constant.ServerConfig, configParam ConfigParam, handlers ...datasource.PropertyHandler) (*NacosDataSource, error) { - if len(serverConfig.IpAddr) <= 0 || serverConfig.Port <= 0 { - return nil, errors.New("The nacos serverConfig is incorrect.") - } +func NewNacosDataSource(client *config_client.ConfigClient, getConfig vo.ConfigParam, handlers ...datasource.PropertyHandler) (*NacosDataSource, error) { var ds = &NacosDataSource{ - clientConfig: clientConfig, - serverConfig: serverConfig, - configParam: configParam, + Base: datasource.Base{}, + client: client, + getConfig: getConfig, + closeChan: make(chan struct{}, 1), } for _, h := range handlers { ds.AddPropertyHandler(h) @@ -58,48 +38,41 @@ func (s *NacosDataSource) Initialize() error { if !s.isInitialized.CompareAndSet(false, true) { return nil } - nc, err := buildNacosClient(s) - client, err := config_client.NewConfigClient(nc) + data, err := s.ReadSource() if err != nil { - return errors.Errorf("Nacosclient failed to build, err: %+v", err) + return err } - s.client = &client - err = s.listen(s.client) - return err + if err = s.doUpdate(data); err != nil { + return err + } + return s.listen(s.client) } -func buildNacosClient(s *NacosDataSource) (nacos_client.INacosClient, error) { - nc := nacos_client.NacosClient{} - nc.SetServerConfig([]constant.ServerConfig{s.serverConfig}) - err := nc.SetClientConfig(s.clientConfig) - err = nc.SetHttpAgent(&http_agent.HttpAgent{}) - return &nc, err -} func (s *NacosDataSource) ReadSource() ([]byte, error) { - content, err := s.client.GetConfig(vo.ConfigParam{ - DataId: s.configParam.DataId, - Group: s.configParam.Group, - }) + content, err := s.client.GetConfig(s.getConfig) if err != nil { return nil, errors.Errorf("Failed to read the nacos data source, err: %+v", err) } return []byte(content), err } +func (s *NacosDataSource) doUpdate(data []byte) error { + return s.Handle(data) +} + func (s *NacosDataSource) listen(client *config_client.ConfigClient) (err error) { - s.listener = &vo.ConfigParam{ - DataId: s.configParam.DataId, - Group: s.configParam.Group, + listener := vo.ConfigParam{ + DataId: s.getConfig.DataId, + Group: s.getConfig.Group, OnChange: func(namespace, group, dataId, data string) { - if err := s.Handle([]byte(data)); err != nil { - logger.Errorf("Fail to update data for dataId:[%s] group:[%s] namespaceId:[%s] when execute "+ - "listen function, err: %+v", data, group, namespace, err) + err := s.doUpdate([]byte(data)) + if err != nil { + logger.Errorf("") } - }, - ListenCloseChan: make(chan struct{}, 1), + ListenCloseChan: s.closeChan, } - err = client.ListenConfig(*s.listener) + err = client.ListenConfig(listener) if err != nil { return errors.Errorf("Failed to listen to the nacos data source, err: %+v", err) } @@ -107,9 +80,9 @@ func (s *NacosDataSource) listen(client *config_client.ConfigClient) (err error) } func (s *NacosDataSource) Close() error { - s.listener.ListenCloseChan <- struct{}{} + s.closeChan <- struct{}{} - logger.Infof("The RefreshableFileDataSource had been closed. DataId:[%s],Group:[%s]", - s.configParam.DataId, s.configParam.Group) + logger.Infof("The nacos datasource had been closed. DataId:[%s],Group:[%s]", + s.getConfig.DataId, s.getConfig.Group) return nil } diff --git a/ext/datasource/nacos/nacos_test.go b/ext/datasource/nacos/nacos_test.go index 47001e617..6222ed734 100644 --- a/ext/datasource/nacos/nacos_test.go +++ b/ext/datasource/nacos/nacos_test.go @@ -36,8 +36,8 @@ const ( var serverConfig = constant.ServerConfig{ ContextPath: "/nacos", - Port: 80, - IpAddr: "console.nacos.io", + Port: 8848, + IpAddr: "127.0.0.1", } var serverConfigErr = constant.ServerConfig{ ContextPath: "/nacos",