Skip to content

Commit

Permalink
Adds isolated v2 reachability check
Browse files Browse the repository at this point in the history
Updates dataAPI reachability status formatting
  • Loading branch information
pschork committed Jan 24, 2025
1 parent d2f0be3 commit 3537c1c
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 75 deletions.
90 changes: 60 additions & 30 deletions disperser/dataapi/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewOperatorHandler(logger logging.Logger, metrics *Metrics, chainReader cor
}
}

func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) {
func (oh *OperatorHandler) ProbeV2OperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) {
operatorInfo, err := oh.subgraphClient.QueryOperatorInfoByOperatorId(ctx, operatorId)
if err != nil {
oh.logger.Warn("failed to fetch operator info", "operatorId", operatorId, "error", err)
Expand All @@ -51,45 +51,75 @@ func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId st
retrievalPortOpen := checkIsOperatorPortOpen(retrievalSocket, 3, oh.logger)
retrievalOnline, retrievalStatus := false, "port closed or unreachable"
if retrievalPortOpen {
retrievalOnline, retrievalStatus = checkServiceOnline(ctx, "node.Retrieval", retrievalSocket, 3*time.Second)
}

v1DispersalSocket := operatorSocket.GetV1DispersalSocket()
v1DispersalPortOpen := checkIsOperatorPortOpen(v1DispersalSocket, 3, oh.logger)
v1DispersalOnline, v1DispersalStatus := false, "port closed or unreachable"
if v1DispersalPortOpen {
v1DispersalOnline, v1DispersalStatus = checkServiceOnline(ctx, "node.Dispersal", v1DispersalSocket, 3*time.Second)
retrievalOnline, retrievalStatus = checkServiceOnline(ctx, "node.v2.Retrieval", retrievalSocket, 3*time.Second)
}

v2DispersalOnline, v2DispersalStatus := false, ""
v2DispersalSocket := operatorSocket.GetV2DispersalSocket()
if v2DispersalSocket == "" {
v2DispersalStatus = "v2 dispersal port is not registered"
dispersalOnline, dispersalStatus := false, ""
dispersalSocket := operatorSocket.GetV2DispersalSocket()
if dispersalSocket == "" {
dispersalStatus = "v2 dispersal port is not registered"
} else {
v2DispersalPortOpen := checkIsOperatorPortOpen(v2DispersalSocket, 3, oh.logger)
if !v2DispersalPortOpen {
v2DispersalStatus = "port closed or unreachable"
dispersalPortOpen := checkIsOperatorPortOpen(dispersalSocket, 3, oh.logger)
if !dispersalPortOpen {
dispersalStatus = "port closed or unreachable"
} else {
v2DispersalOnline, v2DispersalStatus = checkServiceOnline(ctx, "node.v2.Dispersal", v2DispersalSocket, 3*time.Second)
dispersalOnline, dispersalStatus = checkServiceOnline(ctx, "node.v2.Dispersal", dispersalSocket, 3*time.Second)
}
}

// Create the metadata regardless of online status
portCheckResponse := &OperatorPortCheckResponse{
OperatorId: operatorId,
DispersalSocket: v1DispersalSocket,
DispersalStatus: v1DispersalStatus,
DispersalOnline: v1DispersalOnline,
V2DispersalSocket: v2DispersalSocket,
V2DispersalOnline: v2DispersalOnline,
V2DispersalStatus: v2DispersalStatus,
RetrievalSocket: retrievalSocket,
RetrievalOnline: retrievalOnline,
RetrievalStatus: retrievalStatus,
OperatorId: operatorId,
DispersalSocket: dispersalSocket,
DispersalStatus: dispersalStatus,
DispersalOnline: dispersalOnline,
RetrievalSocket: retrievalSocket,
RetrievalOnline: retrievalOnline,
RetrievalStatus: retrievalStatus,
}

// Log the online status
oh.logger.Info("v2 operator port check response", "response", portCheckResponse)

// Send the metadata to the results channel
return portCheckResponse, nil
}

func (oh *OperatorHandler) ProbeV1OperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) {
operatorInfo, err := oh.subgraphClient.QueryOperatorInfoByOperatorId(ctx, operatorId)
if err != nil {
oh.logger.Warn("failed to fetch operator info", "operatorId", operatorId, "error", err)
return &OperatorPortCheckResponse{}, err
}

operatorSocket := core.OperatorSocket(operatorInfo.Socket)
retrievalSocket := operatorSocket.GetRetrievalSocket()
retrievalPortOpen := checkIsOperatorPortOpen(retrievalSocket, 3, oh.logger)
retrievalOnline, retrievalStatus := false, "port closed or UNREACHABLE"
if retrievalPortOpen {
retrievalOnline, retrievalStatus = checkServiceOnline(ctx, "node.Retrieval", retrievalSocket, 3*time.Second)
}

dispersalSocket := operatorSocket.GetV1DispersalSocket()
dispersalPortOpen := checkIsOperatorPortOpen(dispersalSocket, 3, oh.logger)
dispersalOnline, dispersalStatus := false, "port closed or UNREACHABLE"
if dispersalPortOpen {
dispersalOnline, dispersalStatus = checkServiceOnline(ctx, "node.Dispersal", dispersalSocket, 3*time.Second)
}

// Create the metadata regardless of online status
portCheckResponse := &OperatorPortCheckResponse{
OperatorId: operatorId,
DispersalSocket: dispersalSocket,
DispersalStatus: dispersalStatus,
DispersalOnline: dispersalOnline,
RetrievalSocket: retrievalSocket,
RetrievalOnline: retrievalOnline,
RetrievalStatus: retrievalStatus,
}

// Log the online status
oh.logger.Info("operator port check response", "response", portCheckResponse)
oh.logger.Info("v1 operator port check response", "response", portCheckResponse)

// Send the metadata to the results channel
return portCheckResponse, nil
Expand Down Expand Up @@ -132,11 +162,11 @@ func checkServiceOnline(ctx context.Context, serviceName string, socket string,
if list := r.GetListServicesResponse(); list != nil {
for _, service := range list.GetService() {
if service.GetName() == serviceName {
return true, fmt.Sprintf("%s is available", serviceName)
return true, fmt.Sprintf("%s is ONLINE", serviceName)
}
}
}
return false, fmt.Sprintf("grpc available but %s service not found at %s", serviceName, socket)
return false, fmt.Sprintf("grpc available but %s service NOT FOUND", serviceName)
}

func (oh *OperatorHandler) GetOperatorsStake(ctx context.Context, operatorId string) (*OperatorsStakeResponse, error) {
Expand Down
21 changes: 9 additions & 12 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,13 @@ type (
}

OperatorPortCheckResponse struct {
OperatorId string `json:"operator_id"`
DispersalSocket string `json:"dispersal_socket"`
DispersalOnline bool `json:"dispersal_online"`
DispersalStatus string `json:"dispersal_status"`
RetrievalSocket string `json:"retrieval_socket"`
RetrievalOnline bool `json:"retrieval_online"`
RetrievalStatus string `json:"retrieval_status"`
V2DispersalSocket string `json:"v2_dispersal_socket"`
V2DispersalOnline bool `json:"v2_dispersal_online"`
V2DispersalStatus string `json:"v2_dispersal_status"`
OperatorId string `json:"operator_id"`
DispersalSocket string `json:"dispersal_socket"`
DispersalOnline bool `json:"dispersal_online"`
DispersalStatus string `json:"dispersal_status"`
RetrievalSocket string `json:"retrieval_socket"`
RetrievalOnline bool `json:"retrieval_online"`
RetrievalStatus string `json:"retrieval_status"`
}
SemverReportResponse struct {
Semver map[string]*semver.SemverMetrics `json:"semver"`
Expand Down Expand Up @@ -916,7 +913,7 @@ func (s *server) FetchOperatorEjections(c *gin.Context) {

// OperatorPortCheck godoc
//
// @Summary Operator node reachability port check
// @Summary Operator v1 node reachability port check
// @Tags OperatorsInfo
// @Produce json
// @Param operator_id query string true "Operator ID"
Expand All @@ -933,7 +930,7 @@ func (s *server) OperatorPortCheck(c *gin.Context) {

operatorId := c.DefaultQuery("operator_id", "")
s.logger.Info("checking operator ports", "operatorId", operatorId)
portCheckResponse, err := s.operatorHandler.ProbeOperatorHosts(c.Request.Context(), operatorId)
portCheckResponse, err := s.operatorHandler.ProbeV1OperatorPorts(c.Request.Context(), operatorId)
if err != nil {
if strings.Contains(err.Error(), "not found") {
err = errNotFound
Expand Down
21 changes: 9 additions & 12 deletions disperser/dataapi/v2/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,13 @@ type (
}

OperatorPortCheckResponse struct {
OperatorId string `json:"operator_id"`
DispersalSocket string `json:"dispersal_socket"`
DispersalOnline bool `json:"dispersal_online"`
DispersalStatus string `json:"dispersal_status"`
RetrievalSocket string `json:"retrieval_socket"`
RetrievalOnline bool `json:"retrieval_online"`
RetrievalStatus string `json:"retrieval_status"`
V2DispersalSocket string `json:"v2_dispersal_socket"`
V2DispersalOnline bool `json:"v2_dispersal_online"`
V2DispersalStatus string `json:"v2_dispersal_status"`
OperatorId string `json:"operator_id"`
DispersalSocket string `json:"dispersal_socket"`
DispersalOnline bool `json:"dispersal_online"`
DispersalStatus string `json:"dispersal_status"`
RetrievalSocket string `json:"retrieval_socket"`
RetrievalOnline bool `json:"retrieval_online"`
RetrievalStatus string `json:"retrieval_status"`
}

SemverReportResponse struct {
Expand Down Expand Up @@ -820,7 +817,7 @@ func (s *ServerV2) FetchOperatorsResponses(c *gin.Context) {

// CheckOperatorsReachability godoc
//
// @Summary Operator node reachability check
// @Summary Operator v2 node reachability check
// @Tags Operators
// @Produce json
// @Param operator_id query string false "Operator ID in hex string [default: all operators if unspecified]"
Expand All @@ -837,7 +834,7 @@ func (s *ServerV2) CheckOperatorsReachability(c *gin.Context) {

operatorId := c.DefaultQuery("operator_id", "")
s.logger.Info("checking operator ports", "operatorId", operatorId)
portCheckResponse, err := s.operatorHandler.ProbeOperatorHosts(c.Request.Context(), operatorId)
portCheckResponse, err := s.operatorHandler.ProbeV2OperatorPorts(c.Request.Context(), operatorId)
if err != nil {
if strings.Contains(err.Error(), "not found") {
err = errNotFound
Expand Down
47 changes: 29 additions & 18 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ import (
const (
// The percentage of time in garbage collection in a GC cycle.
gcPercentageTime = 0.1

v1CheckPath = "api/v1/operators-info/port-check"
v2CheckPath = "api/v2/operators-info/port-check"
)

var (
Expand Down Expand Up @@ -277,12 +280,13 @@ func (n *Node) Start(ctx context.Context) error {
}

go n.expireLoop()
go n.checkNodeReachability()
go n.checkNodeReachability(v1CheckPath)

if n.Config.EnableV2 {
go func() {
_ = n.RefreshOnchainState(ctx)
}()
go n.checkNodeReachability(v2CheckPath)
}

// Build the socket based on the hostname/IP provided in the CLI
Expand Down Expand Up @@ -665,11 +669,13 @@ type OperatorReachabilityResponse struct {
RetrievalSocket string `json:"retrieval_socket"`
DispersalOnline bool `json:"dispersal_online"`
RetrievalOnline bool `json:"retrieval_online"`
DispersalStatus string `json:"dispersal_status"`
RetrievalStatus string `json:"retrieval_status"`
}

func (n *Node) checkNodeReachability() {
func (n *Node) checkNodeReachability(checkPath string) {
if n.Config.ReachabilityPollIntervalSec == 0 {
n.Logger.Warn("Node reachability checks disabled!!! ReachabilityPollIntervalSec set to 0")
n.Logger.Warn("Node reachability checks disabled!")
return
}

Expand All @@ -678,7 +684,12 @@ func (n *Node) checkNodeReachability() {
return
}

checkURL, err := GetReachabilityURL(n.Config.DataApiUrl, n.Config.ID.Hex())
version := "V1"
if strings.Contains(checkPath, "v2") {
version = "V2"
}

checkURL, err := GetReachabilityURL(n.Config.DataApiUrl, checkPath, n.Config.ID.Hex())
if err != nil {
n.Logger.Error("Failed to get reachability check URL", err)
return
Expand All @@ -691,11 +702,11 @@ func (n *Node) checkNodeReachability() {
for {
<-ticker.C

n.Logger.Debug("Calling reachability check", "url", checkURL)
n.Logger.Debug(fmt.Sprintf("Calling %s reachability check", version), "url", checkURL)

resp, err := http.Get(checkURL)
if err != nil {
n.Logger.Error("Reachability check request failed", err)
n.Logger.Error(fmt.Sprintf("Reachability check %s request failed", version), err)
continue
} else if resp.StatusCode == 404 {
body, _ := io.ReadAll(resp.Body)
Expand All @@ -706,13 +717,13 @@ func (n *Node) checkNodeReachability() {
}
continue
} else if resp.StatusCode != 200 {
n.Logger.Error("Reachability check request failed", "status", resp.StatusCode)
n.Logger.Error(fmt.Sprintf("Reachability check %s request failed", version), "status", resp.StatusCode)
continue
}

data, err := io.ReadAll(resp.Body)
if err != nil {
n.Logger.Error("Failed to read reachability check response", err)
n.Logger.Error(fmt.Sprintf("Failed to read %s reachability check response", version), err)
continue
}

Expand All @@ -724,24 +735,24 @@ func (n *Node) checkNodeReachability() {
}

if responseObject.DispersalOnline {
n.Logger.Info("Reachability check - dispersal socket is ONLINE", "socket", responseObject.DispersalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("dispersal").Set(1.0)
n.Logger.Info(fmt.Sprintf("Reachability check %s", version), "status", responseObject.DispersalStatus, "socket", responseObject.DispersalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("dispersal", version).Set(1.0)
} else {
n.Logger.Error("Reachability check - dispersal socket is UNREACHABLE", "socket", responseObject.DispersalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("dispersal").Set(0.0)
n.Logger.Error(fmt.Sprintf("Reachability check %s", version), "status", responseObject.DispersalStatus, "socket", responseObject.DispersalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("dispersal", version).Set(0.0)
}
if responseObject.RetrievalOnline {
n.Logger.Info("Reachability check - retrieval socket is ONLINE", "socket", responseObject.RetrievalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("retrieval").Set(1.0)
n.Logger.Info(fmt.Sprintf("Reachability check %s", version), "status", responseObject.RetrievalStatus, "socket", responseObject.RetrievalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("retrieval", version).Set(1.0)
} else {
n.Logger.Error("Reachability check - retrieval socket is UNREACHABLE", "socket", responseObject.RetrievalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("retrieval").Set(0.0)
n.Logger.Error(fmt.Sprintf("Reachability check %s", version), "status", responseObject.RetrievalStatus, "socket", responseObject.RetrievalSocket)
n.Metrics.ReachabilityGauge.WithLabelValues("retrieval", version).Set(0.0)
}
}
}

func GetReachabilityURL(dataApiUrl, operatorID string) (string, error) {
checkURLString, err := url.JoinPath(dataApiUrl, "/api/v1/operators-info/port-check")
func GetReachabilityURL(dataApiUrl, path, operatorID string) (string, error) {
checkURLString, err := url.JoinPath(dataApiUrl, path)
if err != nil {
return "", err
}
Expand Down
9 changes: 6 additions & 3 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,13 @@ func TestNodeStartOperatorIDDoesNotMatch(t *testing.T) {
}

func TestGetReachabilityURL(t *testing.T) {
url, err := node.GetReachabilityURL("https://dataapi.eigenda.xyz/", "123123123")
v1CheckPath := "api/v1/operators-info/port-check"
url, err := node.GetReachabilityURL("https://dataapi.eigenda.xyz/", v1CheckPath, "123123123")
assert.NoError(t, err)
assert.Equal(t, "https://dataapi.eigenda.xyz/api/v1/operators-info/port-check?operator_id=123123123", url)
url, err = node.GetReachabilityURL("https://dataapi.eigenda.xyz", "123123123")

v2CheckPath := "api/v2/operators-info/port-check"
url, err = node.GetReachabilityURL("https://dataapi.eigenda.xyz", v2CheckPath, "123123123")
assert.NoError(t, err)
assert.Equal(t, "https://dataapi.eigenda.xyz/api/v1/operators-info/port-check?operator_id=123123123", url)
assert.Equal(t, "https://dataapi.eigenda.xyz/api/v2/operators-info/port-check?operator_id=123123123", url)
}

0 comments on commit 3537c1c

Please sign in to comment.