Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support data sync between eureka and polaris #818

Merged
merged 10 commits into from
Nov 24, 2022
82 changes: 48 additions & 34 deletions apiserver/eurekaserver/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"net/http"
"strings"

restful "github.com/emicklei/go-restful/v3"
"github.com/emicklei/go-restful/v3"

api "github.com/polarismesh/polaris/common/api/v1"
"github.com/polarismesh/polaris/common/utils"
Expand Down Expand Up @@ -113,6 +113,8 @@ func (h *EurekaServer) addDiscoverAccess(ws *restful.WebService) {
// Query for all instances under a particular secure vip address
ws.Route(ws.GET(fmt.Sprintf("/svips/{%s}", ParamSVip)).To(h.QueryBySVipAddress)).
Param(ws.PathParameter(ParamSVip, "svipAddress").DataType("string"))
// Query for handling batch replication request
ws.Route(ws.POST("/peerreplication/batch").To(h.BatchReplication))
}

func parseAcceptValue(acceptValue string) map[string]bool {
Expand Down Expand Up @@ -279,6 +281,27 @@ func (h *EurekaServer) GetDeltaApplications(req *restful.Request, rsp *restful.R
}
}

func convertInstancePorts(instance *InstanceInfo) error {
var err error
if nil != instance.Port {
if err = instance.Port.convertPortValue(); nil != err {
return err
}
if err = instance.Port.convertEnableValue(); nil != err {
return err
}
}
if nil != instance.SecurePort {
if err = instance.SecurePort.convertPortValue(); nil != err {
return err
}
if err = instance.SecurePort.convertEnableValue(); nil != err {
return err
}
}
return nil
}

func checkRegisterRequest(registrationRequest *RegistrationRequest, req *restful.Request, rsp *restful.Response) bool {
var err error
remoteAddr := req.Request.RemoteAddr
Expand All @@ -289,37 +312,12 @@ func checkRegisterRequest(registrationRequest *RegistrationRequest, req *restful
writeHeader(http.StatusBadRequest, rsp)
return false
}
if nil != registrationRequest.Instance.Port {
if err = registrationRequest.Instance.Port.convertPortValue(); nil != err {
log.Errorf("[EUREKA-SERVER] fail to parse instance register request, "+
"invalid insecure port value, client: %s, err: %v", remoteAddr, err)
writePolarisStatusCode(req, api.InvalidInstancePort)
writeHeader(http.StatusBadRequest, rsp)
return false
}
if err = registrationRequest.Instance.Port.convertEnableValue(); nil != err {
log.Errorf("[EUREKA-SERVER] fail to parse instance register request, "+
"invalid insecure enable value, client: %s, err: %v", remoteAddr, err)
writePolarisStatusCode(req, api.InvalidInstancePort)
writeHeader(http.StatusBadRequest, rsp)
return false
}
}
if nil != registrationRequest.Instance.SecurePort {
if err = registrationRequest.Instance.SecurePort.convertPortValue(); nil != err {
log.Errorf("[EUREKA-SERVER] fail to parse instance register request, "+
"invalid secure port value, client: %s, err: %v", remoteAddr, err)
writePolarisStatusCode(req, api.InvalidInstancePort)
writeHeader(http.StatusBadRequest, rsp)
return false
}
if err = registrationRequest.Instance.SecurePort.convertEnableValue(); nil != err {
log.Errorf("[EUREKA-SERVER] fail to parse instance register request, "+
"invalid secure enable value, client: %s, err: %v", remoteAddr, err)
writePolarisStatusCode(req, api.InvalidInstancePort)
writeHeader(http.StatusBadRequest, rsp)
return false
}
err = convertInstancePorts(registrationRequest.Instance)
if nil != err {
log.Errorf("[EUREKA-SERVER] fail to parse instance register request, "+
"invalid port value, client: %s, err: %v", remoteAddr, err)
writePolarisStatusCode(req, api.InvalidInstancePort)
writeHeader(http.StatusBadRequest, rsp)
}
return true
}
Expand Down Expand Up @@ -370,7 +368,7 @@ func (h *EurekaServer) RegisterApplication(req *restful.Request, rsp *restful.Re

log.Infof("[EUREKA-SERVER]received instance register request, client: %s, instId: %s, appId: %s, ipAddr: %s",
remoteAddr, registrationRequest.Instance.InstanceId, appId, registrationRequest.Instance.IpAddr)
code := h.registerInstances(ctx, appId, registrationRequest.Instance)
code := h.registerInstances(ctx, appId, registrationRequest.Instance, false)
if code == api.ExecuteSuccess || code == api.ExistedResource || code == api.SameInstanceRequest {
log.Infof("[EUREKA-SERVER]instance (instId=%s, appId=%s) has been registered successfully, code is %d",
registrationRequest.Instance.InstanceId, appId, code)
Expand Down Expand Up @@ -457,6 +455,11 @@ func (h *EurekaServer) DeleteStatus(req *restful.Request, rsp *restful.Response)
log.Infof("[EUREKA-SERVER]instance status (instId=%s, appId=%s) has been deleted successfully",
instId, appId)
writeHeader(http.StatusOK, rsp)
h.replicateWorker.AddReplicateTask(&ReplicationInstance{
AppName: appId,
Id: instId,
Action: actionDeleteStatusOverride,
})
return
}
log.Errorf("[EUREKA-SERVER]instance status (instId=%s, appId=%s) has been deleted failed, code is %d",
Expand Down Expand Up @@ -491,6 +494,12 @@ func (h *EurekaServer) RenewInstance(req *restful.Request, rsp *restful.Response
writePolarisStatusCode(req, code)
if code == api.ExecuteSuccess || code == api.HeartbeatExceedLimit {
writeHeader(http.StatusOK, rsp)
h.replicateWorker.AddReplicateTask(&ReplicationInstance{
AppName: appId,
Id: instId,
Status: "UP",
Action: actionHeartbeat,
})
return
}
log.Errorf("[EUREKA-SERVER]instance (instId=%s, appId=%s) heartbeat failed, code is %d",
Expand Down Expand Up @@ -529,6 +538,11 @@ func (h *EurekaServer) CancelInstance(req *restful.Request, rsp *restful.Respons
writeHeader(http.StatusOK, rsp)
log.Infof("[EUREKA-SERVER]instance (instId=%s, appId=%s) has been deregistered successfully, code is %d",
instId, appId, code)
h.replicateWorker.AddReplicateTask(&ReplicationInstance{
AppName: appId,
Id: instId,
Action: actionCancel,
})
return
}
log.Errorf("[EUREKA-SERVER]instance (instId=%s, appId=%s) has been deregistered failed, code is %d",
Expand Down Expand Up @@ -656,7 +670,7 @@ func (h *EurekaServer) QueryBySVipAddress(req *restful.Request, rsp *restful.Res

func (h *EurekaServer) formatName(appId string) string {
// 如果开启忽略大小写 则统一转成小写,
if h.ignoreUpLow {
if h.caseSensitive {
appId = strings.ToLower(appId)
} else {
appId = strings.ToUpper(appId)
Expand Down
64 changes: 36 additions & 28 deletions apiserver/eurekaserver/applications.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync/atomic"
"time"

api "github.com/polarismesh/polaris/common/api/v1"
"github.com/polarismesh/polaris/common/model"
"github.com/polarismesh/polaris/service"
)
Expand Down Expand Up @@ -177,11 +178,7 @@ func (a *ApplicationsBuilder) constructApplication(app *Application, instances [
if !instance.Healthy() && !fallbackUnhealthy {
continue
}
var (
instanceInfo *InstanceInfo
eurekaInstanceId = instance.Proto.GetId().GetValue()
)
instanceInfo = buildInstance(app, eurekaInstanceId, instance)
instanceInfo := buildInstance(app.Name, instance.Proto, instance.ModifyTime.UnixNano()/1e6)
instanceInfo.RealInstances[instance.Revision()] = instance
status := instanceInfo.Status
app.StatusCounts[status] = app.StatusCounts[status] + 1
Expand Down Expand Up @@ -274,19 +271,29 @@ func (a *ApplicationsBuilder) buildDeltaApps(oldAppsCache *ApplicationsRespCache
return constructResponseCache(newDeltaApps, instCount, true)
}

func parseStatus(instance *model.Instance) string {
if instance.Proto.GetIsolate().GetValue() {
func parseStatus(instance *api.Instance) string {
if instance.GetIsolate().GetValue() {
return StatusOutOfService
}
return StatusUp
}

func parsePortWrapper(info *InstanceInfo, instance *model.Instance) {
securePort, securePortOk := instance.Metadata()[MetadataSecurePort]
securePortEnabled, securePortEnabledOk := instance.Metadata()[MetadataSecurePortEnabled]
insecurePort, insecurePortOk := instance.Metadata()[MetadataInsecurePort]
insecurePortEnabled, insecurePortEnabledOk := instance.Metadata()[MetadataInsecurePortEnabled]

func parsePortWrapper(info *InstanceInfo, instance *api.Instance) {
metadata := instance.GetMetadata()
var securePortOk bool
var securePortEnabledOk bool
var securePort string
var securePortEnabled string
var insecurePortOk bool
var insecurePortEnabledOk bool
var insecurePort string
var insecurePortEnabled string
if len(metadata) > 0 {
securePort, securePortOk = instance.GetMetadata()[MetadataSecurePort]
securePortEnabled, securePortEnabledOk = instance.GetMetadata()[MetadataSecurePortEnabled]
insecurePort, insecurePortOk = instance.GetMetadata()[MetadataInsecurePort]
insecurePortEnabled, insecurePortEnabledOk = instance.GetMetadata()[MetadataInsecurePortEnabled]
}
if securePortOk && securePortEnabledOk && insecurePortOk && insecurePortEnabledOk {
// if metadata contains all port/securePort,port.enabled/securePort.enabled
sePort, err := strconv.Atoi(securePort)
Expand Down Expand Up @@ -317,13 +324,13 @@ func parsePortWrapper(info *InstanceInfo, instance *model.Instance) {
info.Port.Port = insePort
info.Port.Enabled = insePortEnabled
} else {
protocol := instance.Proto.GetProtocol().GetValue()
port := instance.Proto.GetPort().GetValue()
protocol := instance.GetProtocol().GetValue()
port := instance.GetPort().GetValue()
if protocol == SecureProtocol {
info.SecurePort.Port = int(port)
info.SecurePort.Enabled = "true"
if len(instance.Metadata()) > 0 {
if insecurePortStr, ok := instance.Metadata()[MetadataInsecurePort]; ok {
if len(metadata) > 0 {
if insecurePortStr, ok := metadata[MetadataInsecurePort]; ok {
insecurePort, _ := strconv.Atoi(insecurePortStr)
if insecurePort > 0 {
info.Port.Port = insecurePort
Expand All @@ -338,9 +345,9 @@ func parsePortWrapper(info *InstanceInfo, instance *model.Instance) {
}
}

func parseLeaseInfo(leaseInfo *LeaseInfo, instance *model.Instance) {
func parseLeaseInfo(leaseInfo *LeaseInfo, instance *api.Instance) {
var (
metadata = instance.Proto.GetMetadata()
metadata = instance.GetMetadata()
durationInSec int
renewIntervalSec int
)
Expand All @@ -362,7 +369,8 @@ func parseLeaseInfo(leaseInfo *LeaseInfo, instance *model.Instance) {
}
}

func buildInstance(app *Application, eurekaInstanceId string, instance *model.Instance) *InstanceInfo {
func buildInstance(appName string, instance *api.Instance, lastModifyTime int64) *InstanceInfo {
eurekaInstanceId := instance.GetId().GetValue()
instanceInfo := &InstanceInfo{
CountryId: DefaultCountryIdInt,
Port: &PortWrapper{
Expand All @@ -382,17 +390,17 @@ func buildInstance(app *Application, eurekaInstanceId string, instance *model.In
},
RealInstances: make(map[string]*model.Instance),
}
instanceInfo.AppName = app.Name
instanceInfo.AppName = appName
// 属于eureka注册的实例
instanceInfo.InstanceId = eurekaInstanceId
metadata := instance.Metadata()
metadata := instance.GetMetadata()
if metadata == nil {
metadata = map[string]string{}
}
if hostName, ok := metadata[MetadataHostName]; ok {
instanceInfo.HostName = hostName
}
instanceInfo.IpAddr = instance.Proto.GetHost().GetValue()
instanceInfo.IpAddr = instance.GetHost().GetValue()
instanceInfo.Status = parseStatus(instance)
instanceInfo.OverriddenStatus = StatusUnknown
parsePortWrapper(instanceInfo, instance)
Expand Down Expand Up @@ -435,22 +443,22 @@ func buildInstance(app *Application, eurekaInstanceId string, instance *model.In
instanceInfo.SecureVipAddress = address
}
if instanceInfo.VipAddress == "" {
instanceInfo.VipAddress = app.Name
instanceInfo.VipAddress = appName
}
if instanceInfo.HostName == "" {
instanceInfo.HostName = instance.Proto.GetHost().GetValue()
instanceInfo.HostName = instance.GetHost().GetValue()
}
buildLocationInfo(instanceInfo, instance)
instanceInfo.LastUpdatedTimestamp = strconv.Itoa(int(instance.ModifyTime.UnixNano() / 1e6))
instanceInfo.LastUpdatedTimestamp = strconv.Itoa(int(lastModifyTime))
instanceInfo.ActionType = ActionAdded
return instanceInfo
}

func buildLocationInfo(instanceInfo *InstanceInfo, instance *model.Instance) {
func buildLocationInfo(instanceInfo *InstanceInfo, instance *api.Instance) {
var region string
var zone string
var campus string
if location := instance.Location(); location != nil {
if location := instance.GetLocation(); location != nil {
region = location.GetRegion().GetValue()
zone = location.GetZone().GetValue()
campus = location.GetCampus().GetValue()
Expand Down
3 changes: 2 additions & 1 deletion apiserver/eurekaserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const (
optionConnLimit = "connLimit"
optionTLS = "tls"
optionEnableSelfPreservation = "enableSelfPreservation"
optionIgnoreUpLow = "ignoreUpLow"
optionCaseSensitive = "caseSensitive"
optionPeerNodesToReplicate = "peersToReplicate"
)

const (
Expand Down
55 changes: 42 additions & 13 deletions apiserver/eurekaserver/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,27 @@ func (p *PortWrapper) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error
return nil
}

func (p *PortWrapper) convertPortValue() error {
if jsonNumber, ok := p.Port.(json.Number); ok {
func convertIntValue(value interface{}) (int, error) {
if jsonNumber, ok := value.(json.Number); ok {
realPort, err := jsonNumber.Int64()
if err != nil {
return err
return 0, err
}
p.RealPort = int(realPort)
return nil
return int(realPort), nil
}
if floatValue, ok := p.Port.(float64); ok {
p.RealPort = int(floatValue)
return nil
if floatValue, ok := value.(float64); ok {
return int(floatValue), nil
}
if strValue, ok := p.Port.(string); ok {
var err error
p.RealPort, err = strconv.Atoi(strValue)
return err
if strValue, ok := value.(string); ok {
return strconv.Atoi(strValue)
}
return fmt.Errorf("unknow type of port value, type is %v", reflect.TypeOf(p.Port))
return 0, fmt.Errorf("unknow type of port value, type is %v", reflect.TypeOf(value))
}

func (p *PortWrapper) convertPortValue() error {
var err error
p.RealPort, err = convertIntValue(p.Port)
return err
}

func (p *PortWrapper) convertEnableValue() error {
Expand Down Expand Up @@ -404,3 +406,30 @@ func (a *Applications) GetInstance(instId string) *InstanceInfo {

// StringMap is a map[string]string.
type StringMap map[string]interface{}

// ReplicationInstance request for instance replicate
type ReplicationInstance struct {
AppName string `json:"appName"`
Id string `json:"id"`
LastDirtyTimestamp int64 `json:"lastDirtyTimestamp"`
OverriddenStatus string `json:"overriddenStatus"`
Status string `json:"status"`
InstanceInfo *InstanceInfo `json:"instanceInfo"`
Action string `json:"action"`
}

// ReplicationList instances list to replicate
type ReplicationList struct {
ReplicationList []*ReplicationInstance `json:"replicationList"`
}

// ReplicationInstanceResponse response for instance replicate process
type ReplicationInstanceResponse struct {
StatusCode int `json:"statusCode"`
ResponseEntity *InstanceInfo `json:"responseEntity"`
}

// ReplicationListResponse list for replicate instance response
type ReplicationListResponse struct {
ResponseList []*ReplicationInstanceResponse `json:"responseList"`
}
Loading