diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index d53fb90ee3..3b82026925 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -777,19 +777,19 @@ const docTemplate = `{ "dataapi.OperatorPortCheckResponse": { "type": "object", "properties": { - "disperser_online": { + "dispersal_online": { "type": "boolean" }, - "disperser_socket": { + "dispersal_socket": { "type": "string" }, "operator_id": { "type": "string" }, - "retriever_online": { + "retrieval_online": { "type": "boolean" }, - "retriever_socket": { + "retrieval_socket": { "type": "string" } } diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index 9398178ab8..e6a8188c04 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -773,19 +773,19 @@ "dataapi.OperatorPortCheckResponse": { "type": "object", "properties": { - "disperser_online": { + "dispersal_online": { "type": "boolean" }, - "disperser_socket": { + "dispersal_socket": { "type": "string" }, "operator_id": { "type": "string" }, - "retriever_online": { + "retrieval_online": { "type": "boolean" }, - "retriever_socket": { + "retrieval_socket": { "type": "string" } } diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index d0dc582c1b..6f5e7aaac2 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -140,15 +140,15 @@ definitions: type: object dataapi.OperatorPortCheckResponse: properties: - disperser_online: + dispersal_online: type: boolean - disperser_socket: + dispersal_socket: type: string operator_id: type: string - retriever_online: + retrieval_online: type: boolean - retriever_socket: + retrieval_socket: type: string type: object dataapi.OperatorsNonsigningPercentage: diff --git a/disperser/dataapi/operator_handlers.go b/disperser/dataapi/operator_handlers.go index f6382b5405..8e9e85084a 100644 --- a/disperser/dataapi/operator_handlers.go +++ b/disperser/dataapi/operator_handlers.go @@ -5,6 +5,7 @@ import ( "errors" "net" "sort" + "strings" "time" "github.com/Layr-Labs/eigenda/core" @@ -81,7 +82,7 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat var socket string if operatorStatus.IndexedOperatorInfo != nil { socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket() - isOnline = checkIsOperatorOnline(socket) + isOnline = checkIsOperatorOnline(socket, 10, logger) } // Log the online status @@ -104,31 +105,51 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat operatorOnlineStatusresultsChan <- metadata } +// Check that the socketString is not private/unspecified +func ValidOperatorIP(socketString string, logger logging.Logger) bool { + host := strings.Split(socketString, ":")[0] + ips, err := net.LookupIP(host) + if err != nil { + logger.Error("Error resolving operator host IP", "host", host, "error", err) + return false + } + ipAddr := ips[0] + if ipAddr == nil { + logger.Error("IP address is nil", "host", host, "ips", ips) + return false + } + isValid := !ipAddr.IsPrivate() && !ipAddr.IsUnspecified() + logger.Debug("Operator IP validation", "socketString", socketString, "host", host, "ips", ips, "ipAddr", ipAddr, "isValid", isValid) + + return isValid +} + func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) { operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(context.Background(), operatorId) if err != nil { - s.logger.Warn("Failed to fetch operator info", "error", err) + s.logger.Warn("failed to fetch operator info", "error", err) return &OperatorPortCheckResponse{}, errors.New("not found") } - retrieverSocket := core.OperatorSocket(operatorInfo.Socket).GetRetrievalSocket() - retrieverOnline := checkIsOperatorOnline(retrieverSocket) - - disperserSocket := core.OperatorSocket(operatorInfo.Socket).GetDispersalSocket() - disperserOnline := checkIsOperatorOnline(disperserSocket) + operatorSocket := core.OperatorSocket(operatorInfo.Socket) + retrievalSocket := operatorSocket.GetRetrievalSocket() + retrievalOnline := checkIsOperatorOnline(retrievalSocket, 3, s.logger) - // Log the online status - s.logger.Info("Operator port status", "retrieverOnline", retrieverOnline, "retrieverSocket", retrieverSocket, "disperserOnline", disperserOnline, "disperserSocket", disperserSocket) + dispersalSocket := operatorSocket.GetDispersalSocket() + dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, s.logger) // Create the metadata regardless of online status portCheckResponse := &OperatorPortCheckResponse{ OperatorId: operatorId, - DisperserSocket: disperserSocket, - RetrieverSocket: retrieverSocket, - DisperserOnline: disperserOnline, - RetrieverOnline: retrieverOnline, + DispersalSocket: dispersalSocket, + RetrievalSocket: retrievalSocket, + DispersalOnline: dispersalOnline, + RetrievalOnline: retrievalOnline, } + // Log the online status + s.logger.Info("operator port check response", portCheckResponse) + // Send the metadata to the results channel return portCheckResponse, nil } @@ -136,10 +157,15 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op // method to check if operator is online // Note: This method is least intrusive way to check if operator is online // AlternateSolution: Should we add an endpt to check if operator is online? -func checkIsOperatorOnline(socket string) bool { - timeout := time.Second * 10 +func checkIsOperatorOnline(socket string, timeoutSecs int, logger logging.Logger) bool { + if !ValidOperatorIP(socket, logger) { + logger.Error("port check blocked invalid operator IP", "socket", socket) + return false + } + timeout := time.Second * time.Duration(timeoutSecs) conn, err := net.DialTimeout("tcp", socket, timeout) if err != nil { + logger.Warn("port check timeout", "socket", socket, "timeout", timeoutSecs, "error", err) return false } defer conn.Close() // Close the connection after checking diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 1b585dc941..ac7fffa6e2 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -146,10 +146,10 @@ type ( OperatorPortCheckResponse struct { OperatorId string `json:"operator_id"` - DisperserSocket string `json:"disperser_socket"` - RetrieverSocket string `json:"retriever_socket"` - DisperserOnline bool `json:"disperser_online"` - RetrieverOnline bool `json:"retriever_online"` + DispersalSocket string `json:"dispersal_socket"` + RetrievalSocket string `json:"retrieval_socket"` + DispersalOnline bool `json:"dispersal_online"` + RetrievalOnline bool `json:"retrieval_online"` } ErrorResponse struct { Error string `json:"error"` @@ -687,15 +687,15 @@ func (s *server) OperatorPortCheck(c *gin.Context) { defer timer.ObserveDuration() operatorId := c.DefaultQuery("operator_id", "") - s.logger.Info("Checking operator ports", "operatorId", operatorId) + s.logger.Info("checking operator ports", "operatorId", operatorId) portCheckResponse, err := s.probeOperatorPorts(c.Request.Context(), operatorId) if err != nil { if strings.Contains(err.Error(), "not found") { err = errNotFound - s.logger.Warn("Operator not found", "operatorId", operatorId) + s.logger.Warn("operator not found", "operatorId", operatorId) s.metrics.IncrementNotFoundRequestNum("OperatorPortCheck") } else { - s.logger.Error("Operator port check failed", "error", err) + s.logger.Error("operator port check failed", "error", err) s.metrics.IncrementFailedRequestNum("OperatorPortCheck") } errorResponse(c, err) diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 61b658ddd5..a0c0972c95 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -455,6 +455,52 @@ func getEjector(t *testing.T) *ejectorComponents { } } +func TestPortCheckIpValidation(t *testing.T) { + assert.Equal(t, false, dataapi.ValidOperatorIP("", mockLogger)) + assert.Equal(t, false, dataapi.ValidOperatorIP("0.0.0.0:32005", mockLogger)) + assert.Equal(t, false, dataapi.ValidOperatorIP("10.0.0.1:32005", mockLogger)) + assert.Equal(t, false, dataapi.ValidOperatorIP("::ffff:192.0.2.1:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("localhost:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("127.0.0.1:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("23.93.76.1:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("google.com:32005", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("google.com", mockLogger)) + assert.Equal(t, true, dataapi.ValidOperatorIP("2606:4700:4400::ac40:98f1:32005", mockLogger)) +} + +func TestPortCheck(t *testing.T) { + mockSubgraphApi.ExpectedCalls = nil + mockSubgraphApi.Calls = nil + r := setUpRouter() + operator_id := "0xa96bfb4a7ca981ad365220f336dc5a3de0816ebd5130b79bbc85aca94bc9b6ab" + mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(operatorInfo, nil) + r.GET("/v1/operators-info/port-check", testDataApiServer.OperatorPortCheck) + w := httptest.NewRecorder() + reqStr := fmt.Sprintf("/v1/operators-info/port-check?operator_id=%v", operator_id) + req := httptest.NewRequest(http.MethodGet, reqStr, nil) + ctxWithDeadline, cancel := context.WithTimeout(req.Context(), 500*time.Microsecond) + defer cancel() + req = req.WithContext(ctxWithDeadline) + r.ServeHTTP(w, req) + assert.Equal(t, w.Code, http.StatusOK) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.OperatorPortCheckResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, "23.93.76.1:32005", response.DispersalSocket) + assert.Equal(t, false, response.DispersalOnline) + assert.Equal(t, "23.93.76.1:32006", response.RetrievalSocket) + assert.Equal(t, false, response.RetrievalOnline) +} + func TestCheckBatcherHealthExpectServing(t *testing.T) { r := setUpRouter() testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}) @@ -598,6 +644,9 @@ func TestChurnerServiceAvailabilityHandler(t *testing.T) { func TestFetchDeregisteredOperatorNoSocketInfoOneOperatorHandler(t *testing.T) { + mockSubgraphApi.ExpectedCalls = nil + mockSubgraphApi.Calls = nil + defer goleak.VerifyNone(t) r := setUpRouter() diff --git a/disperser/dataapi/subgraph_client_test.go b/disperser/dataapi/subgraph_client_test.go index 039f5e5537..8b2039bc78 100644 --- a/disperser/dataapi/subgraph_client_test.go +++ b/disperser/dataapi/subgraph_client_test.go @@ -119,6 +119,25 @@ var ( }, } + operatorInfo = &subgraph.IndexedOperatorInfo{ + Id: "0xa96bfb4a7ca981ad365220f336dc5a3de0816ebd5130b79bbc85aca94bc9b6ac", + PubkeyG1_X: "1336192159512049190945679273141887248666932624338963482128432381981287252980", + PubkeyG1_Y: "25195175002875833468883745675063986308012687914999552116603423331534089122704", + PubkeyG2_X: []graphql.String{ + "31597023645215426396093421944506635812143308313031252511177204078669540440732", + "21405255666568400552575831267661419473985517916677491029848981743882451844775", + }, + PubkeyG2_Y: []graphql.String{ + "8416989242565286095121881312760798075882411191579108217086927390793923664442", + "23612061731370453436662267863740141021994163834412349567410746669651828926551", + }, + SocketUpdates: []subgraph.SocketUpdates{ + { + Socket: "23.93.76.1:32005;32006", + }, + }, + } + operatorAddedToQuorum = []*subgraph.OperatorQuorum{ { Operator: "operator-2",