From 61ab5128dedda042c6cbf3c98d97b844cd539df4 Mon Sep 17 00:00:00 2001 From: siddimore Date: Thu, 2 May 2024 19:29:09 -0700 Subject: [PATCH] [DataApi] Add Registered Operators endpt (#508) Co-authored-by: Siddharth More --- disperser/dataapi/docs/docs.go | 107 ++++++++---- disperser/dataapi/docs/swagger.json | 109 ++++++++---- disperser/dataapi/docs/swagger.yaml | 71 +++++--- ...dlers.go => queried_operators_handlers.go} | 57 ++++++- disperser/dataapi/server.go | 60 ++++++- disperser/dataapi/server_test.go | 156 +++++++++++------- disperser/dataapi/subgraph/queries.go | 8 +- disperser/dataapi/subgraph_client.go | 99 +++++++++-- disperser/dataapi/subgraph_client_test.go | 56 ++++++- disperser/dataapi/utils.go | 22 ++- 10 files changed, 538 insertions(+), 207 deletions(-) rename disperser/dataapi/{operator_handlers.go => queried_operators_handlers.go} (70%) diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index 2146a9ce8e..cc65c2114e 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -500,7 +500,7 @@ const docTemplate = `{ "200": { "description": "OK", "schema": { - "$ref": "#/definitions/dataapi.DeregisteredOperatorsResponse" + "$ref": "#/definitions/dataapi.QueriedStateOperatorsResponse" } }, "400": { @@ -569,6 +569,43 @@ const docTemplate = `{ } } } + }, + "/operators-info/registered-operators": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "OperatorsInfo" + ], + "summary": "Fetch list of operators that have been registered for days. Days is a query parameter with a default value of 14 and max value of 30.", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.QueriedStateOperatorsResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } } }, "definitions": { @@ -662,40 +699,6 @@ const docTemplate = `{ } } }, - "dataapi.DeregisteredOperatorMetadata": { - "type": "object", - "properties": { - "block_number": { - "type": "integer" - }, - "is_online": { - "type": "boolean" - }, - "operator_id": { - "type": "string" - }, - "operator_process_error": { - "type": "string" - }, - "socket": { - "type": "string" - } - } - }, - "dataapi.DeregisteredOperatorsResponse": { - "type": "object", - "properties": { - "data": { - "type": "array", - "items": { - "$ref": "#/definitions/dataapi.DeregisteredOperatorMetadata" - } - }, - "meta": { - "$ref": "#/definitions/dataapi.Meta" - } - } - }, "dataapi.ErrorResponse": { "type": "object", "properties": { @@ -808,6 +811,40 @@ const docTemplate = `{ } } }, + "dataapi.QueriedStateOperatorMetadata": { + "type": "object", + "properties": { + "block_number": { + "type": "integer" + }, + "is_online": { + "type": "boolean" + }, + "operator_id": { + "type": "string" + }, + "operator_process_error": { + "type": "string" + }, + "socket": { + "type": "string" + } + } + }, + "dataapi.QueriedStateOperatorsResponse": { + "type": "object", + "properties": { + "data": { + "type": "array", + "items": { + "$ref": "#/definitions/dataapi.QueriedStateOperatorMetadata" + } + }, + "meta": { + "$ref": "#/definitions/dataapi.Meta" + } + } + }, "dataapi.ServiceAvailability": { "type": "object", "properties": { diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index bdfe55cf08..3f0f3c421e 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -496,7 +496,7 @@ "200": { "description": "OK", "schema": { - "$ref": "#/definitions/dataapi.DeregisteredOperatorsResponse" + "$ref": "#/definitions/dataapi.QueriedStateOperatorsResponse" } }, "400": { @@ -565,6 +565,43 @@ } } } + }, + "/operators-info/registered-operators": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "OperatorsInfo" + ], + "summary": "Fetch list of operators that have been registered for days. Days is a query parameter with a default value of 14 and max value of 30.", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.QueriedStateOperatorsResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } } }, "definitions": { @@ -658,40 +695,6 @@ } } }, - "dataapi.DeregisteredOperatorMetadata": { - "type": "object", - "properties": { - "block_number": { - "type": "integer" - }, - "is_online": { - "type": "boolean" - }, - "operator_id": { - "type": "string" - }, - "operator_process_error": { - "type": "string" - }, - "socket": { - "type": "string" - } - } - }, - "dataapi.DeregisteredOperatorsResponse": { - "type": "object", - "properties": { - "data": { - "type": "array", - "items": { - "$ref": "#/definitions/dataapi.DeregisteredOperatorMetadata" - } - }, - "meta": { - "$ref": "#/definitions/dataapi.Meta" - } - } - }, "dataapi.ErrorResponse": { "type": "object", "properties": { @@ -804,6 +807,40 @@ } } }, + "dataapi.QueriedStateOperatorMetadata": { + "type": "object", + "properties": { + "block_number": { + "type": "integer" + }, + "is_online": { + "type": "boolean" + }, + "operator_id": { + "type": "string" + }, + "operator_process_error": { + "type": "string" + }, + "socket": { + "type": "string" + } + } + }, + "dataapi.QueriedStateOperatorsResponse": { + "type": "object", + "properties": { + "data": { + "type": "array", + "items": { + "$ref": "#/definitions/dataapi.QueriedStateOperatorMetadata" + } + }, + "meta": { + "$ref": "#/definitions/dataapi.Meta" + } + } + }, "dataapi.ServiceAvailability": { "type": "object", "properties": { @@ -915,4 +952,4 @@ } } } -} +} \ No newline at end of file diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index 5c0a05b125..b424c07acf 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -66,28 +66,6 @@ definitions: meta: $ref: '#/definitions/dataapi.Meta' type: object - dataapi.DeregisteredOperatorMetadata: - properties: - block_number: - type: integer - is_online: - type: boolean - operator_id: - type: string - operator_process_error: - type: string - socket: - type: string - type: object - dataapi.DeregisteredOperatorsResponse: - properties: - data: - items: - $ref: '#/definitions/dataapi.DeregisteredOperatorMetadata' - type: array - meta: - $ref: '#/definitions/dataapi.Meta' - type: object dataapi.ErrorResponse: properties: error: @@ -160,6 +138,28 @@ definitions: meta: $ref: '#/definitions/dataapi.Meta' type: object + dataapi.QueriedStateOperatorMetadata: + properties: + block_number: + type: integer + is_online: + type: boolean + operator_id: + type: string + operator_process_error: + type: string + socket: + type: string + type: object + dataapi.QueriedStateOperatorsResponse: + properties: + data: + items: + $ref: '#/definitions/dataapi.QueriedStateOperatorMetadata' + type: array + meta: + $ref: '#/definitions/dataapi.Meta' + type: object dataapi.ServiceAvailability: properties: service_name: @@ -557,7 +557,7 @@ paths: "200": description: OK schema: - $ref: '#/definitions/dataapi.DeregisteredOperatorsResponse' + $ref: '#/definitions/dataapi.QueriedStateOperatorsResponse' "400": description: 'error: Bad request' schema: @@ -604,6 +604,31 @@ paths: summary: Operator node reachability port check tags: - OperatorsInfo + /operators-info/registered-operators: + get: + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.QueriedStateOperatorsResponse' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Fetch list of operators that have been registered for days. Days is + a query parameter with a default value of 14 and max value of 30. + tags: + - OperatorsInfo schemes: - https - http diff --git a/disperser/dataapi/operator_handlers.go b/disperser/dataapi/queried_operators_handlers.go similarity index 70% rename from disperser/dataapi/operator_handlers.go rename to disperser/dataapi/queried_operators_handlers.go index b293747c37..0714c1016a 100644 --- a/disperser/dataapi/operator_handlers.go +++ b/disperser/dataapi/queried_operators_handlers.go @@ -22,14 +22,18 @@ var ( // TODO: Poolsize should be configurable // Observe performance and tune accordingly poolSize = 50 - operatorOnlineStatusresultsChan chan *DeregisteredOperatorMetadata + operatorOnlineStatusresultsChan chan *QueriedStateOperatorMetadata ) -func (s *server) getDeregisteredOperatorForDays(ctx context.Context, days int32) ([]*DeregisteredOperatorMetadata, error) { +// Function to get registered operators for given number of days +// Queries subgraph for deregistered operators +// Process operator online status +// Returns list of Operators with their online status, socket address and block number they deregistered +func (s *server) getDeregisteredOperatorForDays(ctx context.Context, days int32) ([]*QueriedStateOperatorMetadata, error) { // Track time taken to get deregistered operators startTime := time.Now() - indexedDeregisteredOperatorState, err := s.subgraphClient.QueryIndexedDeregisteredOperatorsForTimeWindow(ctx, days) + indexedDeregisteredOperatorState, err := s.subgraphClient.QueryIndexedOperatorsWithStateForTimeWindow(ctx, days, Deregistered) if err != nil { return nil, err } @@ -37,11 +41,11 @@ func (s *server) getDeregisteredOperatorForDays(ctx context.Context, days int32) // Convert the map to a slice. operators := indexedDeregisteredOperatorState.Operators - operatorOnlineStatusresultsChan = make(chan *DeregisteredOperatorMetadata, len(operators)) + operatorOnlineStatusresultsChan = make(chan *QueriedStateOperatorMetadata, len(operators)) processOperatorOnlineCheck(indexedDeregisteredOperatorState, operatorOnlineStatusresultsChan, s.logger) // Collect results of work done - DeregisteredOperatorMetadata := make([]*DeregisteredOperatorMetadata, 0, len(operators)) + DeregisteredOperatorMetadata := make([]*QueriedStateOperatorMetadata, 0, len(operators)) for range operators { metadata := <-operatorOnlineStatusresultsChan DeregisteredOperatorMetadata = append(DeregisteredOperatorMetadata, metadata) @@ -56,8 +60,43 @@ func (s *server) getDeregisteredOperatorForDays(ctx context.Context, days int32) return DeregisteredOperatorMetadata, nil } -func processOperatorOnlineCheck(deregisteredOperatorState *IndexedDeregisteredOperatorState, operatorOnlineStatusresultsChan chan<- *DeregisteredOperatorMetadata, logger logging.Logger) { - operators := deregisteredOperatorState.Operators +// Function to get registered operators for given number of days +// Queries subgraph for registered operators +// Process operator online status +// Returns list of Operators with their online status, socket address and block number they registered +func (s *server) getRegisteredOperatorForDays(ctx context.Context, days int32) ([]*QueriedStateOperatorMetadata, error) { + // Track time taken to get registered operators + startTime := time.Now() + + indexedRegisteredOperatorState, err := s.subgraphClient.QueryIndexedOperatorsWithStateForTimeWindow(ctx, days, Registered) + if err != nil { + return nil, err + } + + // Convert the map to a slice. + operators := indexedRegisteredOperatorState.Operators + + operatorOnlineStatusresultsChan = make(chan *QueriedStateOperatorMetadata, len(operators)) + processOperatorOnlineCheck(indexedRegisteredOperatorState, operatorOnlineStatusresultsChan, s.logger) + + // Collect results of work done + RegisteredOperatorMetadata := make([]*QueriedStateOperatorMetadata, 0, len(operators)) + for range operators { + metadata := <-operatorOnlineStatusresultsChan + RegisteredOperatorMetadata = append(RegisteredOperatorMetadata, metadata) + } + + // Log the time taken + s.logger.Info("Time taken to get registered operators for days", "duration", time.Since(startTime)) + sort.Slice(RegisteredOperatorMetadata, func(i, j int) bool { + return RegisteredOperatorMetadata[i].BlockNumber < RegisteredOperatorMetadata[j].BlockNumber + }) + + return RegisteredOperatorMetadata, nil +} + +func processOperatorOnlineCheck(queriedOperatorsInfo *IndexedQueriedOperatorInfo, operatorOnlineStatusresultsChan chan<- *QueriedStateOperatorMetadata, logger logging.Logger) { + operators := queriedOperatorsInfo.Operators wp := workerpool.New(poolSize) for _, operatorInfo := range operators { @@ -76,7 +115,7 @@ func processOperatorOnlineCheck(deregisteredOperatorState *IndexedDeregisteredOp wp.StopWait() // Wait for all submitted tasks to complete and stop the pool } -func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operatorOnlineStatusresultsChan chan<- *DeregisteredOperatorMetadata, logger logging.Logger) { +func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operatorOnlineStatusresultsChan chan<- *QueriedStateOperatorMetadata, logger logging.Logger) { var isOnline bool var socket string if operatorStatus.IndexedOperatorInfo != nil { @@ -92,7 +131,7 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat } // Create the metadata regardless of online status - metadata := &DeregisteredOperatorMetadata{ + metadata := &QueriedStateOperatorMetadata{ OperatorId: string(operatorStatus.OperatorInfo.OperatorId[:]), BlockNumber: uint(operatorStatus.OperatorInfo.BlockNumber), Socket: socket, diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 8dd0545c4d..9f53a7e14f 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -117,7 +117,7 @@ type ( Data []*OperatorNonsigningPercentageMetrics `json:"data"` } - DeregisteredOperatorMetadata struct { + QueriedStateOperatorMetadata struct { OperatorId string `json:"operator_id"` BlockNumber uint `json:"block_number"` Socket string `json:"socket"` @@ -125,9 +125,9 @@ type ( OperatorProcessError string `json:"operator_process_error"` } - DeregisteredOperatorsResponse struct { + QueriedStateOperatorsResponse struct { Meta Meta `json:"meta"` - Data []*DeregisteredOperatorMetadata `json:"data"` + Data []*QueriedStateOperatorMetadata `json:"data"` } ServiceAvailability struct { @@ -248,6 +248,7 @@ func (s *server) Start() error { operatorsInfo := v1.Group("/operators-info") { operatorsInfo.GET("/deregistered-operators", s.FetchDeregisteredOperators) + operatorsInfo.GET("/registered-operators", s.FetchRegisteredOperators) operatorsInfo.GET("/port-check", s.OperatorPortCheck) } metrics := v1.Group("/metrics") @@ -624,7 +625,7 @@ func (s *server) FetchOperatorsNonsigningPercentageHandler(c *gin.Context) { // @Summary Fetch list of operators that have been deregistered for days. Days is a query parameter with a default value of 14 and max value of 30. // @Tags OperatorsInfo // @Produce json -// @Success 200 {object} DeregisteredOperatorsResponse +// @Success 200 {object} QueriedStateOperatorsResponse // @Failure 400 {object} ErrorResponse "error: Bad request" // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" @@ -661,7 +662,56 @@ func (s *server) FetchDeregisteredOperators(c *gin.Context) { s.metrics.IncrementSuccessfulRequestNum("FetchDeregisteredOperators") c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxDeregisteredOperatorAage)) - c.JSON(http.StatusOK, DeregisteredOperatorsResponse{ + c.JSON(http.StatusOK, QueriedStateOperatorsResponse{ + Meta: Meta{ + Size: len(operatorMetadatas), + }, + Data: operatorMetadatas, + }) +} + +// FetchRegisteredOperators godoc +// +// @Summary Fetch list of operators that have been registered for days. Days is a query parameter with a default value of 14 and max value of 30. +// @Tags OperatorsInfo +// @Produce json +// @Success 200 {object} QueriedStateOperatorsResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /operators-info/registered-operators [get] +func (s *server) FetchRegisteredOperators(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchRegisteredOperators", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + // Get query parameters + // Default Value 14 days + days := c.DefaultQuery("days", "14") // If not specified, defaults to 14 + + // Convert days to integer + daysInt, err := strconv.Atoi(days) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid 'days' parameter"}) + return + } + + if daysInt > 30 { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid 'days' parameter. Max value is 30"}) + return + } + + operatorMetadatas, err := s.getRegisteredOperatorForDays(c.Request.Context(), int32(daysInt)) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchRegisteredOperators") + errorResponse(c, err) + return + } + + s.metrics.IncrementSuccessfulRequestNum("FetchRegisteredOperators") + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxDeregisteredOperatorAage)) + c.JSON(http.StatusOK, QueriedStateOperatorsResponse{ Meta: Meta{ Size: len(operatorMetadatas), }, diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 0d62ba3af2..d897ccbd25 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -655,17 +655,16 @@ func TestFetchDeregisteredOperatorNoSocketInfoOneOperatorHandler(t *testing.T) { r := setUpRouter() - indexedOperatorStates := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfoNoSocketInfo - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) - mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) + mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfoNoSocketInfo, nil).Once() testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) r.GET("/v1/operators-info/deregistered-operators", testDataApiServer.FetchDeregisteredOperators) @@ -679,7 +678,7 @@ func TestFetchDeregisteredOperatorNoSocketInfoOneOperatorHandler(t *testing.T) { data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response dataapi.DeregisteredOperatorsResponse + var response dataapi.QueriedStateOperatorsResponse err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) @@ -702,11 +701,10 @@ func TestFetchDeregisteredMultipleOperatorsOneWithNoSocketInfoHandler(t *testing r := setUpRouter() - indexedOperatorStates := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfoNoSocketInfo indexedOperatorStates[core.OperatorID{1}] = subgraphDeregisteredOperatorInfo2 - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphTwoOperatorsDeregistered, nil) // Set up the mock calls for the two operators @@ -714,7 +712,7 @@ func TestFetchDeregisteredMultipleOperatorsOneWithNoSocketInfoHandler(t *testing mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) // Start test server for Operator closeServer, err := startTestGRPCServer("localhost:32009") // Let the OS assign a free port @@ -735,7 +733,7 @@ func TestFetchDeregisteredMultipleOperatorsOneWithNoSocketInfoHandler(t *testing data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response dataapi.DeregisteredOperatorsResponse + var response dataapi.QueriedStateOperatorsResponse err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) @@ -772,17 +770,16 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampHandler(t *testing.T) { r := setUpRouter() - indexedOperatorStates := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfoInvalidTimeStamp - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregisteredInvalidTimeStamp, nil) // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) r.GET("/v1/operators-info/deregistered-operators", testDataApiServer.FetchDeregisteredOperators) @@ -796,7 +793,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampHandler(t *testing.T) { data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response dataapi.DeregisteredOperatorsResponse + var response dataapi.QueriedStateOperatorsResponse err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) @@ -816,18 +813,17 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampTwoOperatorsHandler(t *tes r := setUpRouter() - indexedOperatorStates := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfoInvalidTimeStamp indexedOperatorStates[core.OperatorID{1}] = subgraphDeregisteredOperatorInfo2 - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregisteredInvalidTimeStampTwoOperator, nil) // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) r.GET("/v1/operators-info/deregistered-operators", testDataApiServer.FetchDeregisteredOperators) @@ -841,7 +837,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampTwoOperatorsHandler(t *tes data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response dataapi.DeregisteredOperatorsResponse + var response dataapi.QueriedStateOperatorsResponse err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) @@ -871,11 +867,10 @@ func TestFetchMetricsDeregisteredOperatorHandler(t *testing.T) { r := setUpRouter() - indexedOperatorStates := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo indexedOperatorStates[core.OperatorID{1}] = subgraphDeregisteredOperatorInfo2 - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphTwoOperatorsDeregistered, nil) // Set up the mock calls for the two operators @@ -883,7 +878,7 @@ func TestFetchMetricsDeregisteredOperatorHandler(t *testing.T) { mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) // Start the test server for Operator 2 closeServer, err := startTestGRPCServer("localhost:32009") @@ -904,7 +899,7 @@ func TestFetchMetricsDeregisteredOperatorHandler(t *testing.T) { data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response dataapi.DeregisteredOperatorsResponse + var response dataapi.QueriedStateOperatorsResponse err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) @@ -940,15 +935,14 @@ func TestFetchDeregisteredOperatorOffline(t *testing.T) { r := setUpRouter() - indexedOperatorState := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorState := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorState[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) - mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) + mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) r.GET("/v1/operators-info/deregistered-operators", testDataApiServer.FetchDeregisteredOperators) @@ -962,7 +956,7 @@ func TestFetchDeregisteredOperatorOffline(t *testing.T) { data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response dataapi.DeregisteredOperatorsResponse + var response dataapi.QueriedStateOperatorsResponse err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) @@ -989,11 +983,10 @@ func TestFetchDeregisteredOperatorsWithoutDaysQueryParam(t *testing.T) { r := setUpRouter() - indexedOperatorStates := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo indexedOperatorStates[core.OperatorID{1}] = subgraphDeregisteredOperatorInfo2 - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphTwoOperatorsDeregistered, nil) // Set up the mock calls for the two operators @@ -1001,7 +994,7 @@ func TestFetchDeregisteredOperatorsWithoutDaysQueryParam(t *testing.T) { mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) r.GET("/v1/operators-info/deregistered-operators/", testDataApiServer.FetchDeregisteredOperators) @@ -1015,7 +1008,7 @@ func TestFetchDeregisteredOperatorsWithoutDaysQueryParam(t *testing.T) { data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response dataapi.DeregisteredOperatorsResponse + var response dataapi.QueriedStateOperatorsResponse err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) @@ -1050,16 +1043,15 @@ func TestFetchDeregisteredOperatorInvalidDaysQueryParam(t *testing.T) { r := setUpRouter() - indexedOperatorStates := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo indexedOperatorStates[core.OperatorID{1}] = subgraphDeregisteredOperatorInfo2 - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) - mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) + mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) r.GET("/v1/operators-info/deregistered-operators", testDataApiServer.FetchDeregisteredOperators) @@ -1093,15 +1085,14 @@ func TestFetchDeregisteredOperatorQueryDaysGreaterThan30(t *testing.T) { r := setUpRouter() - indexedOperatorState := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorState := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorState[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) - mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) + mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) r.GET("/v1/operators-info/deregistered-operators", testDataApiServer.FetchDeregisteredOperators) @@ -1135,11 +1126,10 @@ func TestFetchDeregisteredOperatorsMultipleOffline(t *testing.T) { r := setUpRouter() - indexedOperatorStates := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo indexedOperatorStates[core.OperatorID{1}] = subgraphDeregisteredOperatorInfo2 - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphTwoOperatorsDeregistered, nil) // Set up the mock calls for the two operators @@ -1147,7 +1137,7 @@ func TestFetchDeregisteredOperatorsMultipleOffline(t *testing.T) { mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) r.GET("/v1/operators-info/deregistered-operators", testDataApiServer.FetchDeregisteredOperators) @@ -1161,7 +1151,7 @@ func TestFetchDeregisteredOperatorsMultipleOffline(t *testing.T) { data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response dataapi.DeregisteredOperatorsResponse + var response dataapi.QueriedStateOperatorsResponse err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) @@ -1197,15 +1187,14 @@ func TestFetchDeregisteredOperatorOnline(t *testing.T) { r := setUpRouter() - indexedOperatorState := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorState := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorState[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) - mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) + mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) // Start test server for Operator closeServer, err := startTestGRPCServer("localhost:32007") // Let the OS assign a free port @@ -1226,7 +1215,7 @@ func TestFetchDeregisteredOperatorOnline(t *testing.T) { data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response dataapi.DeregisteredOperatorsResponse + var response dataapi.QueriedStateOperatorsResponse err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) @@ -1249,11 +1238,10 @@ func TestFetchDeregisteredOperatorsMultipleOfflineOnline(t *testing.T) { r := setUpRouter() - indexedOperatorStates := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo indexedOperatorStates[core.OperatorID{1}] = subgraphDeregisteredOperatorInfo2 - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphTwoOperatorsDeregistered, nil) // Set up the mock calls for the two operators @@ -1261,7 +1249,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineOnline(t *testing.T) { mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) // Start the test server for Operator 2 closeServer, err := startTestGRPCServer("localhost:32009") @@ -1282,7 +1270,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineOnline(t *testing.T) { data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response dataapi.DeregisteredOperatorsResponse + var response dataapi.QueriedStateOperatorsResponse err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) @@ -1318,17 +1306,16 @@ func TestFetchDeregisteredOperatorsMultipleOnline(t *testing.T) { r := setUpRouter() - indexedOperatorStates := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo indexedOperatorStates[core.OperatorID{1}] = subgraphDeregisteredOperatorInfo2 - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphTwoOperatorsDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) // Start test server for Operator 1 closeServer1, err := startTestGRPCServer("localhost:32007") // Let the OS assign a free port @@ -1356,7 +1343,7 @@ func TestFetchDeregisteredOperatorsMultipleOnline(t *testing.T) { data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response dataapi.DeregisteredOperatorsResponse + var response dataapi.QueriedStateOperatorsResponse err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) @@ -1389,12 +1376,11 @@ func TestFetchDeregisteredOperatorsMultipleOfflineSameBlock(t *testing.T) { r := setUpRouter() - indexedOperatorStates := make(map[core.OperatorID]*subgraph.DeregisteredOperatorInfo) + indexedOperatorStates := make(map[core.OperatorID]*subgraph.OperatorInfo) indexedOperatorStates[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo indexedOperatorStates[core.OperatorID{1}] = subgraphDeregisteredOperatorInfo2 indexedOperatorStates[core.OperatorID{2}] = subgraphDeregisteredOperatorInfo3 - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphThreeOperatorsDeregistered, nil) // Set up the mock calls for the three operators @@ -1403,7 +1389,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineSameBlock(t *testing.T) { mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo3, nil).Once() testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) - mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) r.GET("/v1/operators-info/deregistered-operators", testDataApiServer.FetchDeregisteredOperators) @@ -1417,7 +1403,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineSameBlock(t *testing.T) { data, err := io.ReadAll(res.Body) assert.NoError(t, err) - var response dataapi.DeregisteredOperatorsResponse + var response dataapi.QueriedStateOperatorsResponse err = json.Unmarshal(data, &response) assert.NoError(t, err) assert.NotNil(t, response) @@ -1451,6 +1437,54 @@ func TestFetchDeregisteredOperatorsMultipleOfflineSameBlock(t *testing.T) { mockSubgraphApi.Calls = nil } +func TestFetchRegisteredOperatorOnline(t *testing.T) { + + defer goleak.VerifyNone(t) + + r := setUpRouter() + + indexedOperatorState := make(map[core.OperatorID]*subgraph.OperatorInfo) + indexedOperatorState[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo + mockSubgraphApi.On("QueryRegisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorRegistered, nil) + mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + + mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) + + // Start test server for Operator + closeServer, err := startTestGRPCServer("localhost:32007") // Let the OS assign a free port + if err != nil { + t.Fatalf("Failed to start test server: %v", err) + } + defer closeServer() // Ensure the server is closed after the test + + r.GET("/v1/operators-info/registered-operators", testDataApiServer.FetchRegisteredOperators) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/operators-info/registered-operators?days=14", nil) + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.QueriedStateOperatorsResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, http.StatusOK, res.StatusCode) + assert.Equal(t, 1, response.Meta.Size) + assert.Equal(t, 1, len(response.Data)) + assert.Equal(t, true, response.Data[0].IsOnline) + + // Reset the mock + mockSubgraphApi.ExpectedCalls = nil + mockSubgraphApi.Calls = nil +} + func setUpRouter() *gin.Engine { return gin.Default() } @@ -1550,13 +1584,13 @@ func startTestGRPCServer(address string) (stopFunc func(), err error) { } // Helper to get OperatorData from response -func getOperatorData(operatorMetadtas []*dataapi.DeregisteredOperatorMetadata, operatorId string) dataapi.DeregisteredOperatorMetadata { +func getOperatorData(operatorMetadtas []*dataapi.QueriedStateOperatorMetadata, operatorId string) dataapi.QueriedStateOperatorMetadata { for _, operatorMetadata := range operatorMetadtas { if operatorMetadata.OperatorId == operatorId { return *operatorMetadata } } - return dataapi.DeregisteredOperatorMetadata{} + return dataapi.QueriedStateOperatorMetadata{} } diff --git a/disperser/dataapi/subgraph/queries.go b/disperser/dataapi/subgraph/queries.go index 0fd0fff1fc..9072f6c78b 100644 --- a/disperser/dataapi/subgraph/queries.go +++ b/disperser/dataapi/subgraph/queries.go @@ -1,7 +1,6 @@ package subgraph import ( - "github.com/Layr-Labs/eigenda/core" "github.com/shurcooL/graphql" ) @@ -69,15 +68,12 @@ type ( // Socket is the socket address of the operator, in the form "host:port" SocketUpdates []SocketUpdates `graphql:"socketUpdates(first: 1, orderBy: blockNumber, orderDirection: desc)"` } - DeregisteredOperatorInfo struct { + OperatorInfo struct { IndexedOperatorInfo *IndexedOperatorInfo // BlockNumber is the block number at which the operator was deregistered. - BlockNumber uint + BlockNumber uint32 Metadata *Operator } - IndexedDeregisteredOperatorState struct { - Operators map[core.OperatorID]*DeregisteredOperatorInfo - } queryBatches struct { Batches []*Batches `graphql:"batches(orderDirection: $orderDirection, orderBy: $orderBy, first: $first, skip: $skip)"` diff --git a/disperser/dataapi/subgraph_client.go b/disperser/dataapi/subgraph_client.go index c853796f4a..5665641503 100644 --- a/disperser/dataapi/subgraph_client.go +++ b/disperser/dataapi/subgraph_client.go @@ -19,6 +19,14 @@ const ( maxWorkerPoolSize = 10 ) +// Define the type for the enum. +type OperatorState int + +const ( + Deregistered OperatorState = iota // iota starts at 0 + Registered +) + type ( SubgraphClient interface { QueryBatchesWithLimit(ctx context.Context, limit, skip int) ([]*Batch, error) @@ -26,7 +34,7 @@ type ( QueryBatchNonSigningOperatorIdsInInterval(ctx context.Context, intervalSeconds int64) (map[string]int, error) QueryBatchNonSigningInfoInInterval(ctx context.Context, startTime, endTime int64) ([]*BatchNonSigningInfo, error) QueryOperatorQuorumEvent(ctx context.Context, startBlock, endBlock uint32) (*OperatorQuorumEvents, error) - QueryIndexedDeregisteredOperatorsForTimeWindow(ctx context.Context, days int32) (*IndexedDeregisteredOperatorState, error) + QueryIndexedOperatorsWithStateForTimeWindow(ctx context.Context, days int32, state OperatorState) (*IndexedQueriedOperatorInfo, error) QueryOperatorInfoByOperatorId(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error) } Batch struct { @@ -66,15 +74,15 @@ type ( // (ascending by BlockNumber) where the operator was removed from quorums. RemovedFromQuorum map[string][]*OperatorQuorum } - DeregisteredOperatorInfo struct { + QueriedOperatorInfo struct { IndexedOperatorInfo *core.IndexedOperatorInfo // BlockNumber is the block number at which the operator was deregistered. BlockNumber uint Metadata *Operator OperatorProcessError string } - IndexedDeregisteredOperatorState struct { - Operators map[core.OperatorID]*DeregisteredOperatorInfo + IndexedQueriedOperatorInfo struct { + Operators map[core.OperatorID]*QueriedOperatorInfo } NonSigner struct { OperatorId string @@ -234,7 +242,39 @@ func (sc *subgraphClient) QueryOperatorQuorumEvent(ctx context.Context, startBlo }, nil } -func (sc *subgraphClient) QueryIndexedDeregisteredOperatorsForTimeWindow(ctx context.Context, days int32) (*IndexedDeregisteredOperatorState, error) { +func (sc *subgraphClient) QueryIndexedOperatorsWithStateForTimeWindow(ctx context.Context, days int32, state OperatorState) (*IndexedQueriedOperatorInfo, error) { + // Query all operators in the last N days. + lastNDayInSeconds := uint64(time.Now().Add(-time.Duration(days) * 24 * time.Hour).Unix()) + var operators map[core.OperatorID]*QueriedOperatorInfo + if state == Deregistered { + // Get OperatorsInfo for DeRegistered Operators + deregisteredOperators, err := sc.api.QueryDeregisteredOperatorsGreaterThanBlockTimestamp(ctx, lastNDayInSeconds) + if err != nil { + return nil, err + } + + operators = make(map[core.OperatorID]*QueriedOperatorInfo, len(deregisteredOperators)) + getOperatorInfoForQueriedOperators(sc, ctx, operators, deregisteredOperators) + } else if state == Registered { + // Get OperatorsInfo for Registered Operators + registeredOperators, err := sc.api.QueryRegisteredOperatorsGreaterThanBlockTimestamp(ctx, lastNDayInSeconds) + if err != nil { + return nil, err + } + + operators = make(map[core.OperatorID]*QueriedOperatorInfo, len(registeredOperators)) + getOperatorInfoForQueriedOperators(sc, ctx, operators, registeredOperators) + + } else { + return nil, fmt.Errorf("invalid operator state: %d", state) + } + + return &IndexedQueriedOperatorInfo{ + Operators: operators, + }, nil +} + +func (sc *subgraphClient) QueryIndexedDeregisteredOperatorsForTimeWindow(ctx context.Context, days int32) (*IndexedQueriedOperatorInfo, error) { // Query all deregistered operators in the last N days. lastNDayInSeconds := uint64(time.Now().Add(-time.Duration(days) * 24 * time.Hour).Unix()) deregisteredOperators, err := sc.api.QueryDeregisteredOperatorsGreaterThanBlockTimestamp(ctx, lastNDayInSeconds) @@ -242,14 +282,43 @@ func (sc *subgraphClient) QueryIndexedDeregisteredOperatorsForTimeWindow(ctx con return nil, err } - operators := make(map[core.OperatorID]*DeregisteredOperatorInfo, len(deregisteredOperators)) - for i := range deregisteredOperators { - deregisteredOperator := deregisteredOperators[i] - operator, err := convertOperator(deregisteredOperator) + operators := make(map[core.OperatorID]*QueriedOperatorInfo, len(deregisteredOperators)) + // Get OpeatroInfo for DeRegistered Operators + getOperatorInfoForQueriedOperators(sc, ctx, operators, deregisteredOperators) + + return &IndexedQueriedOperatorInfo{ + Operators: operators, + }, nil +} + +func (sc *subgraphClient) QueryIndexedRegisteredOperatorsForTimeWindow(ctx context.Context, days int32) (*IndexedQueriedOperatorInfo, error) { + // Query all registered operators in the last N days. + lastNDayInSeconds := uint64(time.Now().Add(-time.Duration(days) * 24 * time.Hour).Unix()) + registeredOperators, err := sc.api.QueryRegisteredOperatorsGreaterThanBlockTimestamp(ctx, lastNDayInSeconds) + if err != nil { + return nil, err + } + + operators := make(map[core.OperatorID]*QueriedOperatorInfo, len(registeredOperators)) + + // Get OpeatroInfo for Registered Operators + getOperatorInfoForQueriedOperators(sc, ctx, operators, registeredOperators) + + return &IndexedQueriedOperatorInfo{ + Operators: operators, + }, nil + +} + +func getOperatorInfoForQueriedOperators(sc *subgraphClient, ctx context.Context, operators map[core.OperatorID]*QueriedOperatorInfo, queriedOperators []*subgraph.Operator) { + + for i := range queriedOperators { + queriedOperator := queriedOperators[i] + operator, err := convertOperator(queriedOperator) var operatorId [32]byte if err != nil && operator == nil { - sc.logger.Warn("failed to convert", "err", err, "operator", deregisteredOperator) + sc.logger.Warn("failed to convert", "err", err, "operator", queriedOperator) continue } @@ -273,17 +342,13 @@ func (sc *subgraphClient) QueryIndexedDeregisteredOperatorsForTimeWindow(ctx con continue } - operators[operatorId] = &DeregisteredOperatorInfo{ + operators[operatorId] = &QueriedOperatorInfo{ IndexedOperatorInfo: indexedOperatorInfo, BlockNumber: uint(operator.BlockNumber), Metadata: operator, OperatorProcessError: "", } } - - return &IndexedDeregisteredOperatorState{ - Operators: operators, - }, nil } func convertBatches(subgraphBatches []*subgraph.Batches) ([]*Batch, error) { @@ -361,8 +426,8 @@ func convertOperator(operator *subgraph.Operator) (*Operator, error) { } // This helper function adds an operator with an error message to the operators map. -func addOperatorWithErrorDetail(operators map[core.OperatorID]*DeregisteredOperatorInfo, operator *Operator, operatorId [32]byte, errorMessage string) { - operators[operatorId] = &DeregisteredOperatorInfo{ +func addOperatorWithErrorDetail(operators map[core.OperatorID]*QueriedOperatorInfo, operator *Operator, operatorId [32]byte, errorMessage string) { + operators[operatorId] = &QueriedOperatorInfo{ IndexedOperatorInfo: nil, BlockNumber: uint(operator.BlockNumber), Metadata: operator, diff --git a/disperser/dataapi/subgraph_client_test.go b/disperser/dataapi/subgraph_client_test.go index 8b2039bc78..1d3768359c 100644 --- a/disperser/dataapi/subgraph_client_test.go +++ b/disperser/dataapi/subgraph_client_test.go @@ -32,7 +32,18 @@ var ( }, } - subgraphOperatorDeregistereds = []*subgraph.Operator{ + subgraphOperatorRegistered = []*subgraph.Operator{ + { + Id: "0x000763fb86a79eda47c891d8826474d80b6a935ad2a2b5de921933e05c67f320f211", + OperatorId: "0xe1cdae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311", + Operator: "0x000563fb86a79eda47c891d8826474d80b6a935ad2a2b5de921933e05c67f320f211", + BlockTimestamp: "1696975449", + BlockNumber: "87", + TransactionHash: "0x000163fb86a79eda47c891d8826474d80b6a935ad2a2b5de921933e05c67f320f211", + }, + } + + subgraphOperatorDeregistered = []*subgraph.Operator{ { Id: "0x000763fb86a79eda47c891d8826474d80b6a935ad2a2b5de921933e05c67f320f222", OperatorId: "0xe22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311", @@ -357,7 +368,7 @@ var ( }, } - subgraphDeregisteredOperatorInfo = &subgraph.DeregisteredOperatorInfo{ + subgraphDeregisteredOperatorInfo = &subgraph.OperatorInfo{ IndexedOperatorInfo: subgraphIndexedOperatorInfo1, BlockNumber: 22, Metadata: &subgraph.Operator{ @@ -370,7 +381,7 @@ var ( }, } - subgraphDeregisteredOperatorInfo2 = &subgraph.DeregisteredOperatorInfo{ + subgraphDeregisteredOperatorInfo2 = &subgraph.OperatorInfo{ IndexedOperatorInfo: subgraphIndexedOperatorInfo2, BlockNumber: 24, Metadata: &subgraph.Operator{ @@ -383,7 +394,7 @@ var ( }, } - subgraphDeregisteredOperatorInfo3 = &subgraph.DeregisteredOperatorInfo{ + subgraphDeregisteredOperatorInfo3 = &subgraph.OperatorInfo{ IndexedOperatorInfo: subgraphIndexedOperatorInfo2, BlockNumber: 24, Metadata: &subgraph.Operator{ @@ -396,7 +407,7 @@ var ( }, } - subgraphDeregisteredOperatorInfoNoSocketInfo = &subgraph.DeregisteredOperatorInfo{ + subgraphDeregisteredOperatorInfoNoSocketInfo = &subgraph.OperatorInfo{ IndexedOperatorInfo: subgraphIndexedOperatorInfoNoSocketInfo, BlockNumber: 22, Metadata: &subgraph.Operator{ @@ -409,7 +420,7 @@ var ( }, } - subgraphDeregisteredOperatorInfoInvalidTimeStamp = &subgraph.DeregisteredOperatorInfo{ + subgraphDeregisteredOperatorInfoInvalidTimeStamp = &subgraph.OperatorInfo{ IndexedOperatorInfo: subgraphIndexedOperatorInfo1, BlockNumber: 22, Metadata: &subgraph.Operator{ @@ -477,10 +488,10 @@ func TestQueryOperators(t *testing.T) { func TestQueryIndexedDeregisteredOperatorsForTimeWindow(t *testing.T) { mockSubgraphApi := &subgraphmock.MockSubgraphApi{} - mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) + mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) subgraphClient := dataapi.NewSubgraphClient(mockSubgraphApi, logging.NewNoopLogger()) - indexedDeregisteredOperatorState, err := subgraphClient.QueryIndexedDeregisteredOperatorsForTimeWindow(context.Background(), 1) + indexedDeregisteredOperatorState, err := subgraphClient.QueryIndexedOperatorsWithStateForTimeWindow(context.Background(), 1, dataapi.Deregistered) assert.NoError(t, err) operators := indexedDeregisteredOperatorState.Operators @@ -504,6 +515,35 @@ func TestQueryIndexedDeregisteredOperatorsForTimeWindow(t *testing.T) { assert.Equal(t, uint64(22), uint64(operator.Metadata.BlockNumber)) } +func TestQueryIndexedRegisteredOperatorsForTimeWindow(t *testing.T) { + mockSubgraphApi := &subgraphmock.MockSubgraphApi{} + mockSubgraphApi.On("QueryRegisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorRegistered, nil) + mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) + subgraphClient := dataapi.NewSubgraphClient(mockSubgraphApi, logging.NewNoopLogger()) + indexedRegisteredOperatorState, err := subgraphClient.QueryIndexedOperatorsWithStateForTimeWindow(context.Background(), 1, dataapi.Registered) + assert.NoError(t, err) + + operators := indexedRegisteredOperatorState.Operators + assert.Equal(t, 1, len(operators)) + + var operatorId [32]byte + copy(operatorId[:], []byte("0xe1cdae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311")) + operator := operators[operatorId] + + assert.NotNil(t, operator) + + expectedIndexedOperatorInfo, err := dataapi.ConvertOperatorInfoGqlToIndexedOperatorInfo(subgraphIndexedOperatorInfo1) + assert.NoError(t, err) + + assert.Equal(t, expectedIndexedOperatorInfo.PubkeyG1, operator.IndexedOperatorInfo.PubkeyG1) + assert.Equal(t, expectedIndexedOperatorInfo.PubkeyG2, operator.IndexedOperatorInfo.PubkeyG2) + assert.Equal(t, "localhost:32006;32007", operator.IndexedOperatorInfo.Socket) + assert.Equal(t, uint64(87), uint64(operator.BlockNumber)) + assert.Equal(t, "0xe1cdae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311", operator.Metadata.OperatorId) + assert.Equal(t, "0x000163fb86a79eda47c891d8826474d80b6a935ad2a2b5de921933e05c67f320f211", operator.Metadata.TransactionHash) + assert.Equal(t, uint64(87), uint64(operator.Metadata.BlockNumber)) +} + func TestQueryBatchNonSigningInfoInInterval(t *testing.T) { mockSubgraphApi := &subgraphmock.MockSubgraphApi{} mockSubgraphApi.On("QueryBatchNonSigningInfo", int64(0), int64(1)).Return(batchNonSigningInfo, nil) diff --git a/disperser/dataapi/utils.go b/disperser/dataapi/utils.go index 6d98e146a3..148504cf51 100644 --- a/disperser/dataapi/utils.go +++ b/disperser/dataapi/utils.go @@ -3,6 +3,7 @@ package dataapi import ( "encoding/hex" "errors" + "fmt" "strings" "time" @@ -37,37 +38,44 @@ func ConvertNanosecondToSecond(timestamp uint64) uint64 { } func ConvertOperatorInfoGqlToIndexedOperatorInfo(operator *subgraph.IndexedOperatorInfo) (*core.IndexedOperatorInfo, error) { + if operator == nil { + return nil, errors.New("operator is nil") + } if len(operator.SocketUpdates) == 0 { - return nil, errors.New("no socket found for operator") + return nil, errors.New("no socket updates found for operator") } pubkeyG1 := new(bn254.G1Affine) _, err := pubkeyG1.X.SetString(string(operator.PubkeyG1_X)) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to set PubkeyG1_X: %v", err) } _, err = pubkeyG1.Y.SetString(string(operator.PubkeyG1_Y)) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to set PubkeyG1_Y: %v", err) + } + + if len(operator.PubkeyG2_X) < 2 || len(operator.PubkeyG2_Y) < 2 { + return nil, errors.New("incomplete PubkeyG2 coordinates") } pubkeyG2 := new(bn254.G2Affine) _, err = pubkeyG2.X.A1.SetString(string(operator.PubkeyG2_X[0])) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to set PubkeyG2_X[0]: %v", err) } _, err = pubkeyG2.X.A0.SetString(string(operator.PubkeyG2_X[1])) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to set PubkeyG2_X[1]: %v", err) } _, err = pubkeyG2.Y.A1.SetString(string(operator.PubkeyG2_Y[0])) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to set PubkeyG2_Y[0]: %v", err) } _, err = pubkeyG2.Y.A0.SetString(string(operator.PubkeyG2_Y[1])) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to set PubkeyG2_Y[1]: %v", err) } return &core.IndexedOperatorInfo{