Skip to content

Commit

Permalink
Add batch clean instances maintian interface (#784)
Browse files Browse the repository at this point in the history
* Add batch clean instances maintian interface

* fix golangci-lint

* fix lint

* update store api_mock

* update mock readme

* update boltdb logic delete

* refactor batch clean
  • Loading branch information
shichaoyuan authored Nov 11, 2022
1 parent efe6137 commit 481935e
Show file tree
Hide file tree
Showing 32 changed files with 432 additions and 152 deletions.
13 changes: 13 additions & 0 deletions apiserver/httpserver/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"

Expand Down Expand Up @@ -243,3 +244,15 @@ func ParseQueryParams(req *restful.Request) map[string]string {

return queryParams
}

// ParseJsonBody parse http body as json object
func ParseJsonBody(req *restful.Request, value interface{}) error {
body, err := ioutil.ReadAll(req.Request.Body)
if err != nil {
return err
}
if err := json.Unmarshal(body, value); err != nil {
return err
}
return nil
}
26 changes: 26 additions & 0 deletions apiserver/httpserver/http/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package http

import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/emicklei/go-restful/v3"
Expand Down Expand Up @@ -73,3 +75,27 @@ func Test_i18n(t *testing.T) {
}
}
}

func Test_ParseJsonBody(t *testing.T) {
type TestJsonObject struct {
Text string `json:"text"`
}

expectText := "this is a test"

httpReq, _ := http.NewRequest(
http.MethodPost,
"http://example.com",
strings.NewReader(fmt.Sprintf("{\"text\": \"%s\"}", expectText)))
req := restful.NewRequest(httpReq)

testResult := TestJsonObject{}
err := ParseJsonBody(req, &testResult)
if err != nil {
t.Errorf("ParseJsonBody err %v, want %v", err, expectText)
}
if testResult.Text != expectText {
t.Errorf("ParseJsonBody = %v, want %v", testResult.Text, expectText)
}

}
34 changes: 27 additions & 7 deletions apiserver/httpserver/maintain_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package httpserver
import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"strconv"

Expand All @@ -42,6 +41,7 @@ func (h *HTTPServer) GetMaintainAccessServer() *restful.WebService {
ws.Route(enrichCloseConnectionsApiDocs(ws.POST("apiserver/conn/close").To(h.CloseConnections)))
ws.Route(enrichFreeOSMemoryApiDocs(ws.POST("/memory/free").To(h.FreeOSMemory)))
ws.Route(enrichCleanInstanceApiDocs(ws.POST("/instance/clean").To(h.CleanInstance)))
ws.Route(enrichBatchCleanInstancesApiDocs(ws.POST("/instance/batchclean").To(h.BatchCleanInstances)))
ws.Route(enrichGetLastHeartbeatApiDocs(ws.GET("/instance/heartbeat").To(h.GetLastHeartbeat)))
ws.Route(enrichGetLogOutputLevelApiDocs(ws.GET("/log/outputlevel").To(h.GetLogOutputLevel)))
ws.Route(enrichSetLogOutputLevelApiDocs(ws.PUT("/log/outputlevel").To(h.SetLogOutputLevel)))
Expand Down Expand Up @@ -149,6 +149,30 @@ func (h *HTTPServer) CleanInstance(req *restful.Request, rsp *restful.Response)
handler.WriteHeaderAndProto(h.maintainServer.CleanInstance(ctx, instance))
}

func (h *HTTPServer) BatchCleanInstances(req *restful.Request, rsp *restful.Response) {
ctx := initContext(req)

var param struct {
BatchSize uint32 `json:"batch_size"`
}

if err := httpcommon.ParseJsonBody(req, &param); err != nil {
_ = rsp.WriteError(http.StatusBadRequest, err)
return
}

if count, err := h.maintainServer.BatchCleanInstances(ctx, param.BatchSize); err != nil {
_ = rsp.WriteError(http.StatusInternalServerError, err)
} else {
var ret struct {
RowsAffected uint32 `json:"rows_affected"`
}
ret.RowsAffected = count
_ = rsp.WriteAsJson(ret)
}

}

// GetLastHeartbeat 获取实例,上一次心跳的时间
func (h *HTTPServer) GetLastHeartbeat(req *restful.Request, rsp *restful.Response) {
ctx := initContext(req)
Expand Down Expand Up @@ -193,12 +217,8 @@ func (h *HTTPServer) SetLogOutputLevel(req *restful.Request, rsp *restful.Respon
Scope string `json:"scope"`
Level string `json:"level"`
}
body, err := ioutil.ReadAll(req.Request.Body)
if err != nil {
_ = rsp.WriteErrorString(http.StatusBadRequest, err.Error())
return
}
if err := json.Unmarshal(body, &scopeLogLevel); err != nil {

if err := httpcommon.ParseJsonBody(req, &scopeLogLevel); err != nil {
_ = rsp.WriteErrorString(http.StatusBadRequest, err.Error())
return
}
Expand Down
7 changes: 7 additions & 0 deletions apiserver/httpserver/maintain_apidoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ func enrichCleanInstanceApiDocs(r *restful.RouteBuilder) *restful.RouteBuilder {
Notes(enrichCleanInstanceApiNotes)
}

func enrichBatchCleanInstancesApiDocs(r *restful.RouteBuilder) *restful.RouteBuilder {
return r.
Doc("彻底清理flag=1的实例").
Metadata(restfulspec.KeyOpenAPITags, maintainApiTags).
Notes(enrichBatchCleanInstancesApiNotes)
}

func enrichGetLastHeartbeatApiDocs(r *restful.RouteBuilder) *restful.RouteBuilder {
return r.
Doc("获取上一次心跳的时间").
Expand Down
14 changes: 14 additions & 0 deletions apiserver/httpserver/maintain_apinotes.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ Header Content-Type: application/json
}
~~~
`
enrichBatchCleanInstancesApiNotes = `
请求示例:
~~~
POST /maintain/v1/instance/batchclean
Header X-Polaris-Token: {访问凭据}
Header Content-Type: application/json
{
"batch_size": 100
}
~~~
`
enrichGetLastHeartbeatApiNotes = `
请求示例:
Expand Down
2 changes: 1 addition & 1 deletion bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func StartComponents(ctx context.Context, cfg *boot_config.Config) error {
}

// 初始化运维操作模块
if err := maintain.Initialize(ctx, namingSvr, healthCheckServer); err != nil {
if err := maintain.Initialize(ctx, namingSvr, healthCheckServer, s); err != nil {
return err
}

Expand Down
3 changes: 3 additions & 0 deletions maintain/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type MaintainOperateServer interface {
// CleanInstance Clean deleted instance
CleanInstance(ctx context.Context, req *api.Instance) *api.Response

// BatchCleanInstances Batch clean deleted instances
BatchCleanInstances(ctx context.Context, batchSize uint32) (uint32, error)

// GetLastHeartbeat Get last heartbeat
GetLastHeartbeat(ctx context.Context, req *api.Instance) *api.Response

Expand Down
8 changes: 5 additions & 3 deletions maintain/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/polarismesh/polaris/auth"
"github.com/polarismesh/polaris/service"
"github.com/polarismesh/polaris/service/healthcheck"
"github.com/polarismesh/polaris/store"
)

var (
Expand All @@ -33,12 +34,12 @@ var (
)

// Initialize 初始化
func Initialize(ctx context.Context, namingService service.DiscoverServer, healthCheckServer *healthcheck.Server) error {
func Initialize(ctx context.Context, namingService service.DiscoverServer, healthCheckServer *healthcheck.Server, storage store.Store) error {
if finishInit {
return nil
}

err := initialize(ctx, namingService, healthCheckServer)
err := initialize(ctx, namingService, healthCheckServer, storage)
if err != nil {
return err
}
Expand All @@ -47,14 +48,15 @@ func Initialize(ctx context.Context, namingService service.DiscoverServer, healt
return nil
}

func initialize(_ context.Context, namingService service.DiscoverServer, healthCheckServer *healthcheck.Server) error {
func initialize(_ context.Context, namingService service.DiscoverServer, healthCheckServer *healthcheck.Server, storage store.Store) error {
authServer, err := auth.GetAuthServer()
if err != nil {
return err
}

maintainServer.namingServer = namingService
maintainServer.healthCheckServer = healthCheckServer
maintainServer.storage = storage

server = newServerAuthAbility(maintainServer, authServer)
return nil
Expand Down
4 changes: 4 additions & 0 deletions maintain/maintain.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ func (s *Server) CleanInstance(ctx context.Context, req *api.Instance) *api.Resp
return s.namingServer.CleanInstance(ctx, req)
}

func (s *Server) BatchCleanInstances(ctx context.Context, batchSize uint32) (uint32, error) {
return s.storage.BatchCleanDeletedInstances(batchSize)
}

func (s *Server) GetLastHeartbeat(_ context.Context, req *api.Instance) *api.Response {
return s.healthCheckServer.GetLastHeartbeat(req)
}
Expand Down
10 changes: 10 additions & 0 deletions maintain/maintain_authability.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ func (svr *serverAuthAbility) CleanInstance(ctx context.Context, req *api.Instan
return svr.targetServer.CleanInstance(ctx, req)
}

func (svr *serverAuthAbility) BatchCleanInstances(ctx context.Context, batchSize uint32) (uint32, error) {
authCtx := svr.collectMaintainAuthContext(ctx, model.Delete, "BatchCleanInstances")
_, err := svr.authMgn.CheckConsolePermission(authCtx)
if err != nil {
return 0, err
}

return svr.targetServer.BatchCleanInstances(ctx, batchSize)
}

func (svr *serverAuthAbility) GetLastHeartbeat(ctx context.Context, req *api.Instance) *api.Response {
authCtx := svr.collectMaintainAuthContext(ctx, model.Read, "GetLastHeartbeat")
_, err := svr.authMgn.CheckConsolePermission(authCtx)
Expand Down
2 changes: 2 additions & 0 deletions maintain/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/polarismesh/polaris/service"
"github.com/polarismesh/polaris/service/healthcheck"
"github.com/polarismesh/polaris/store"
)

var _ MaintainOperateServer = (*Server)(nil)
Expand All @@ -30,4 +31,5 @@ type Server struct {
mu sync.Mutex
namingServer service.DiscoverServer
healthCheckServer *healthcheck.Server
storage store.Store
}
3 changes: 3 additions & 0 deletions store/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type Store interface {

// ClientStore Client the central module storage interface
ClientStore

// MaintainStore Maintain inteface
MaintainStore
}

// NamespaceStore Namespace storage interface
Expand Down
2 changes: 1 addition & 1 deletion store/boltdb/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (c *circuitBreakerStore) CreateCircuitBreaker(cb *model.CircuitBreaker) err

// cleanCircuitBreaker 彻底清理熔断规则
func (c *circuitBreakerStore) cleanCircuitBreaker(id string, version string) error {
if err := c.handler.DeleteValues(tblCircuitBreaker, []string{c.buildKey(id, version)}, false); err != nil {
if err := c.handler.DeleteValues(tblCircuitBreaker, []string{c.buildKey(id, version)}); err != nil {
log.Errorf("[Store][circuitBreaker] clean invalid circuit-breaker(%s, %s) err: %s",
id, version, err.Error())
return store.Error(err)
Expand Down
4 changes: 2 additions & 2 deletions store/boltdb/config_file_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (t *configFileTagStore) DeleteConfigFileTag(proxyTx store.Tx, namespace, gr
_, err := DoTransactionIfNeed(proxyTx, t.handler, func(tx *bolt.Tx) ([]interface{}, error) {
dataKey := fmt.Sprintf("%s@%s@%s@%s@%s", key, value, namespace, group, fileName)

if err := deleteValues(tx, tbleConfigFileTag, []string{dataKey}, false); err != nil {
if err := deleteValues(tx, tbleConfigFileTag, []string{dataKey}); err != nil {
return nil, err
}
return nil, nil
Expand Down Expand Up @@ -212,7 +212,7 @@ func (t *configFileTagStore) DeleteTagByConfigFile(proxyTx store.Tx, namespace,
keys = append(keys, dataKey)
}

if err := deleteValues(tx, tbleConfigFileTag, keys, false); err != nil {
if err := deleteValues(tx, tbleConfigFileTag, keys); err != nil {
return nil, err
}

Expand Down
13 changes: 13 additions & 0 deletions store/boltdb/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type boltStore struct {
// v2 存储
*routingStoreV2

// maintain store
*maintainStore

handler BoltHandler
start bool
}
Expand Down Expand Up @@ -261,6 +264,10 @@ func (m *boltStore) newStore() error {
return err
}

if err := m.newMaintainModuleStore(); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -326,6 +333,12 @@ func (m *boltStore) newConfigModuleStore() error {
return nil
}

func (m *boltStore) newMaintainModuleStore() error {
m.maintainStore = &maintainStore{handler: m.handler}

return nil
}

// Destroy store
func (m *boltStore) Destroy() error {
m.start = false
Expand Down
2 changes: 1 addition & 1 deletion store/boltdb/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (gs *groupStore) cleanInValidGroup(tx *bolt.Tx, name, owner string) error {
keys = append(keys, k)
}

return deleteValues(tx, tblGroup, keys, false)
return deleteValues(tx, tblGroup, keys)
}

func convertForGroupStore(group *model.UserGroupDetail) *groupForStore {
Expand Down
20 changes: 7 additions & 13 deletions store/boltdb/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type BoltHandler interface {
SaveValue(typ string, key string, object interface{}) error

// DeleteValues delete data object by unique key
DeleteValues(typ string, key []string, logicDelete bool) error
DeleteValues(typ string, key []string) error

// UpdateValue update properties of data object
UpdateValue(typ string, key string, properties map[string]interface{}) error
Expand Down Expand Up @@ -388,31 +388,25 @@ func (b *boltHandler) Close() error {
}

// DeleteValues delete data object by unique key
func (b *boltHandler) DeleteValues(typ string, keys []string, logicDelete bool) error {
func (b *boltHandler) DeleteValues(typ string, keys []string) error {
if len(keys) == 0 {
return nil
}
return b.db.Update(func(tx *bolt.Tx) error {
return deleteValues(tx, typ, keys, logicDelete)
return deleteValues(tx, typ, keys)
})
}

func deleteValues(tx *bolt.Tx, typ string, keys []string, logicDelete bool) error {
func deleteValues(tx *bolt.Tx, typ string, keys []string) error {
typeBucket := tx.Bucket([]byte(typ))
if typeBucket == nil {
return nil
}
for _, key := range keys {
keyBytes := []byte(key)
if subBucket := typeBucket.Bucket(keyBytes); subBucket != nil {
if logicDelete {
if err := subBucket.Put([]byte(toBucketField(DataValidFieldName)), encodeBoolBuffer(false)); err != nil {
return err
}
} else {
if err := typeBucket.DeleteBucket(keyBytes); err != nil {
return err
}
if err := typeBucket.DeleteBucket(keyBytes); err != nil {
return err
}
}
}
Expand Down Expand Up @@ -481,7 +475,7 @@ func (b *boltHandler) CountValues(typ string) (int, error) {
canCount := true

if subBucket != nil {
data := subBucket.Get([]byte(DataValidFieldName))
data := subBucket.Get([]byte(toBucketField(DataValidFieldName)))
if len(data) == 0 {
canCount = true
} else {
Expand Down
Loading

0 comments on commit 481935e

Please sign in to comment.