Skip to content

Commit

Permalink
Add service verification to reachability check
Browse files Browse the repository at this point in the history
Reachability check first checks if port is open

If port is open, verify the expected service is available based on port specification using grpc reflection
This will catch situations where port is open, but service backend is configured wrong or unreachable.
  • Loading branch information
pschork committed Jan 22, 2025
1 parent a31001f commit 4ccea2c
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 5 deletions.
61 changes: 59 additions & 2 deletions disperser/dataapi/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"github.com/Layr-Labs/eigenda/disperser/common/semver"
"github.com/Layr-Labs/eigenda/operators"
"github.com/Layr-Labs/eigensdk-go/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection/grpc_reflection_v1"
)

// OperatorHandler handles operations to collect and process operators info.
Expand Down Expand Up @@ -44,10 +47,18 @@ func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId st

operatorSocket := core.OperatorSocket(operatorInfo.Socket)
retrievalSocket := operatorSocket.GetRetrievalSocket()
retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, oh.logger)
retrievalPortOpen := checkIsOperatorPortOpen(retrievalSocket, 3, oh.logger)
retrievalOnline, retrievalStatus := false, "port closed"
if retrievalPortOpen {
retrievalOnline, retrievalStatus = checkServiceOnline(ctx, "node.Retrieval", retrievalSocket, 3*time.Second)
}

dispersalSocket := operatorSocket.GetV1DispersalSocket()
dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, oh.logger)
dispersalPortOpen := checkIsOperatorPortOpen(dispersalSocket, 3, oh.logger)
dispersalOnline, dispersalStatus := false, "port closed"
if dispersalPortOpen {
dispersalOnline, dispersalStatus = checkServiceOnline(ctx, "node.Dispersal", dispersalSocket, 3*time.Second)
}

// Create the metadata regardless of online status
portCheckResponse := &OperatorPortCheckResponse{
Expand All @@ -56,6 +67,8 @@ func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId st
RetrievalSocket: retrievalSocket,
DispersalOnline: dispersalOnline,
RetrievalOnline: retrievalOnline,
DispersalStatus: dispersalStatus,
RetrievalStatus: retrievalStatus,
}

// Log the online status
Expand All @@ -65,6 +78,50 @@ func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId st
return portCheckResponse, nil
}

// query operator host info endpoint if available
func checkServiceOnline(ctx context.Context, serviceName string, socket string, timeout time.Duration) (bool, string) {
conn, err := grpc.NewClient(socket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return false, err.Error()
}
defer conn.Close()
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

// Create a reflection client
reflectionClient := grpc_reflection_v1.NewServerReflectionClient(conn)

// Send ListServices request
stream, err := reflectionClient.ServerReflectionInfo(ctxWithTimeout)
if err != nil {
return false, err.Error()
}

// Send the ListServices request
listReq := &grpc_reflection_v1.ServerReflectionRequest{
MessageRequest: &grpc_reflection_v1.ServerReflectionRequest_ListServices{},
}
if err := stream.Send(listReq); err != nil {
return false, err.Error()
}

// Get the response
r, err := stream.Recv()
if err != nil {
return false, err.Error()
}

// Check if the service exists
if list := r.GetListServicesResponse(); list != nil {
for _, service := range list.GetService() {
if service.GetName() == serviceName {
return true, "available"
}
}
}
return false, "unavailable"
}

func (oh *OperatorHandler) GetOperatorsStake(ctx context.Context, operatorId string) (*OperatorsStakeResponse, error) {
currentBlock, err := oh.indexedChainState.GetCurrentBlockNumber()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions disperser/dataapi/queried_operators_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat
var socket string
if operatorStatus.IndexedOperatorInfo != nil {
socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket()
isOnline = checkIsOperatorOnline(socket, 10, logger)
isOnline = checkIsOperatorPortOpen(socket, 10, logger)
}

// Log the online status
Expand Down Expand Up @@ -245,8 +245,8 @@ func ValidOperatorIP(address string, logger logging.Logger) bool {
return isValid
}

// method to check if operator is online via socket dial
func checkIsOperatorOnline(socket string, timeoutSecs int, logger logging.Logger) bool {
// method to check if operator port is open
func checkIsOperatorPortOpen(socket string, timeoutSecs int, logger logging.Logger) bool {
if !ValidOperatorIP(socket, logger) {
logger.Error("port check blocked invalid operator IP", "socket", socket)
return false
Expand Down
2 changes: 2 additions & 0 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ type (
RetrievalSocket string `json:"retrieval_socket"`
DispersalOnline bool `json:"dispersal_online"`
RetrievalOnline bool `json:"retrieval_online"`
DispersalStatus string `json:"dispersal_status"`
RetrievalStatus string `json:"retrieval_status"`
}
SemverReportResponse struct {
Semver map[string]*semver.SemverMetrics `json:"semver"`
Expand Down

0 comments on commit 4ccea2c

Please sign in to comment.