From e903682cf00aa9ee18c6e9c74d184263e1dad3df Mon Sep 17 00:00:00 2001 From: siddimore Date: Thu, 7 Mar 2024 21:10:46 -0800 Subject: [PATCH] [DataApi] Add EigenDA Public Service Availability Handler (#276) Co-authored-by: Siddharth More --- disperser/cmd/dataapi/config.go | 7 +- disperser/cmd/dataapi/flags/flags.go | 22 +- disperser/cmd/dataapi/main.go | 33 +- disperser/dataapi/config.go | 8 +- disperser/dataapi/docs/docs.go | 62 +++ disperser/dataapi/docs/swagger.json | 62 +++ disperser/dataapi/docs/swagger.yaml | 40 ++ disperser/dataapi/server.go | 139 ++++++- disperser/dataapi/server_test.go | 389 +++++++++++++++++- .../dataapi/service_availability_handler.go | 140 +++++++ 10 files changed, 866 insertions(+), 36 deletions(-) create mode 100644 disperser/dataapi/service_availability_handler.go diff --git a/disperser/cmd/dataapi/config.go b/disperser/cmd/dataapi/config.go index 0689bb747b..04aa010ba5 100644 --- a/disperser/cmd/dataapi/config.go +++ b/disperser/cmd/dataapi/config.go @@ -28,6 +28,9 @@ type Config struct { BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string + + DisperserHostname string + ChurnerHostname string } func NewConfig(ctx *cli.Context) Config { @@ -54,8 +57,10 @@ func NewConfig(ctx *cli.Context) Config { AllowOrigins: ctx.GlobalStringSlice(flags.AllowOriginsFlag.Name), MetricsConfig: dataapi.MetricsConfig{ HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name), - EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name), + EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name), }, + DisperserHostname: ctx.GlobalString(flags.DisperserHostnameFlag.Name), + ChurnerHostname: ctx.GlobalString(flags.ChurnerHostnameFlag.Name), } return config } diff --git a/disperser/cmd/dataapi/flags/flags.go b/disperser/cmd/dataapi/flags/flags.go index bd680eb830..70f5554b63 100644 --- a/disperser/cmd/dataapi/flags/flags.go +++ b/disperser/cmd/dataapi/flags/flags.go @@ -96,12 +96,28 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "ALLOW_ORIGINS"), Required: true, } - EnableMetrics = cli.BoolFlag{ + EnableMetricsFlag = cli.BoolFlag{ Name: common.PrefixFlag(FlagPrefix, "enable-metrics"), Usage: "start metrics server", Required: true, EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_METRICS"), } + // EigenDA Disperser and Churner Hostnames to check Server Availability + // ex: + // disperser-goerli.eigenda.eigenops.xyz, + // churner-goerli.eigenda.eigenops.xyz + DisperserHostnameFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "eigenda-disperser-hostname"), + Usage: "HostName of EigenDA Disperser", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "EIGENDA_DISPERSER_HOSTNAME"), + } + ChurnerHostnameFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "eigenda-churner-hostname"), + Usage: "HostName of EigenDA Churner", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "EIGENDA_CHURNER_HOSTNAME"), + } /* Optional Flags*/ MetricsHTTPPort = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"), @@ -125,7 +141,9 @@ var requiredFlags = []cli.Flag{ PrometheusServerSecretFlag, PrometheusMetricsClusterLabelFlag, AllowOriginsFlag, - EnableMetrics, + EnableMetricsFlag, + DisperserHostnameFlag, + ChurnerHostnameFlag, } var optionalFlags = []cli.Flag{ diff --git a/disperser/cmd/dataapi/main.go b/disperser/cmd/dataapi/main.go index 3027ed9eb9..e98aeaa1bd 100644 --- a/disperser/cmd/dataapi/main.go +++ b/disperser/cmd/dataapi/main.go @@ -5,6 +5,8 @@ import ( "fmt" "log" "os" + "os/signal" + "syscall" "github.com/Layr-Labs/eigenda/common/aws/dynamodb" "github.com/Layr-Labs/eigenda/common/aws/s3" @@ -90,9 +92,11 @@ func RunDataApi(ctx *cli.Context) error { metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger) server = dataapi.NewServer( dataapi.Config{ - ServerMode: config.ServerMode, - SocketAddr: config.SocketAddr, - AllowOrigins: config.AllowOrigins, + ServerMode: config.ServerMode, + SocketAddr: config.SocketAddr, + AllowOrigins: config.AllowOrigins, + DisperserHostname: config.DisperserHostname, + ChurnerHostname: config.ChurnerHostname, }, sharedStorage, promClient, @@ -101,6 +105,8 @@ func RunDataApi(ctx *cli.Context) error { chainState, logger, metrics, + nil, + nil, ) ) @@ -111,5 +117,26 @@ func RunDataApi(ctx *cli.Context) error { logger.Info("Enabled metrics for Data Access API", "socket", httpSocket) } + // Setup channel to listen for termination signals + quit := make(chan os.Signal, 1) + // catch SIGINT (Ctrl+C) and SIGTERM (e.g., from `kill`) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + + // Run server in a separate goroutine so that it doesn't block. + go func() { + if err := server.Start(); err != nil { + logger.Fatalf("Failed to start server: %v", err) + } + }() + + // Block until a signal is received. + <-quit + logger.Info("Shutting down server...") + err = server.Shutdown() + + if err != nil { + logger.Errorf("Failed to shutdown server: %v", err) + } + return server.Start() } diff --git a/disperser/dataapi/config.go b/disperser/dataapi/config.go index b30765729c..c22bd1f054 100644 --- a/disperser/dataapi/config.go +++ b/disperser/dataapi/config.go @@ -1,7 +1,9 @@ package dataapi type Config struct { - SocketAddr string - ServerMode string - AllowOrigins []string + SocketAddr string + ServerMode string + AllowOrigins []string + DisperserHostname string + ChurnerHostname string } diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index 5e10b4f3fb..c950c2f63c 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -15,6 +15,43 @@ const docTemplate = `{ "host": "{{.Host}}", "basePath": "{{.BasePath}}", "paths": { + "/eigenda/service-availability": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "ServiceAvailability" + ], + "summary": "Get status of public EigenDA services.", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.ServiceAvailabilityResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, "/feed/blobs": { "get": { "produces": [ @@ -542,6 +579,31 @@ const docTemplate = `{ } } }, + "dataapi.ServiceAvailability": { + "type": "object", + "properties": { + "service_name": { + "type": "string" + }, + "service_status": { + "type": "string" + } + } + }, + "dataapi.ServiceAvailabilityResponse": { + "type": "object", + "properties": { + "data": { + "type": "array", + "items": { + "$ref": "#/definitions/dataapi.ServiceAvailability" + } + }, + "meta": { + "$ref": "#/definitions/dataapi.Meta" + } + } + }, "dataapi.Throughput": { "type": "object", "properties": { diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index 72da019dac..e92f70841b 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -11,6 +11,43 @@ "version": "1" }, "paths": { + "/eigenda/service-availability": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "ServiceAvailability" + ], + "summary": "Get status of public EigenDA services.", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.ServiceAvailabilityResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, "/feed/blobs": { "get": { "produces": [ @@ -538,6 +575,31 @@ } } }, + "dataapi.ServiceAvailability": { + "type": "object", + "properties": { + "service_name": { + "type": "string" + }, + "service_status": { + "type": "string" + } + } + }, + "dataapi.ServiceAvailabilityResponse": { + "type": "object", + "properties": { + "data": { + "type": "array", + "items": { + "$ref": "#/definitions/dataapi.ServiceAvailability" + } + }, + "meta": { + "$ref": "#/definitions/dataapi.Meta" + } + } + }, "dataapi.Throughput": { "type": "object", "properties": { diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index d033e01aa8..e458331820 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -132,6 +132,22 @@ definitions: meta: $ref: '#/definitions/dataapi.Meta' type: object + dataapi.ServiceAvailability: + properties: + service_name: + type: string + service_status: + type: string + type: object + dataapi.ServiceAvailabilityResponse: + properties: + data: + items: + $ref: '#/definitions/dataapi.ServiceAvailability' + type: array + meta: + $ref: '#/definitions/dataapi.Meta' + type: object dataapi.Throughput: properties: throughput: @@ -194,6 +210,30 @@ info: title: EigenDA Data Access API version: "1" paths: + /eigenda/service-availability: + get: + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.ServiceAvailabilityResponse' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Get status of public EigenDA services. + tags: + - ServiceAvailability /feed/blobs: get: parameters: diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 6fe520ba25..f89dc53279 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -7,12 +7,14 @@ import ( "os" "os/signal" "strconv" + "strings" "syscall" "time" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" + "google.golang.org/grpc/health/grpc_health_v1" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/disperser/dataapi/docs" @@ -31,6 +33,11 @@ const ( var errNotFound = errors.New("not found") +type EigenDAServiceChecker interface { + CheckHealth(ctx context.Context, serviceName string) (*grpc_health_v1.HealthCheckResponse, error) + CloseConnections() error +} + type ( BlobMetadataResponse struct { BlobKey string `json:"blob_key"` @@ -95,6 +102,16 @@ type ( Data []*DeregisteredOperatorMetadata `json:"data"` } + ServiceAvailability struct { + ServiceName string `json:"service_name"` + ServiceStatus string `json:"service_status"` + } + + ServiceAvailabilityResponse struct { + Meta Meta `json:"meta"` + Data []*ServiceAvailability `json:"data"` + } + ErrorResponse struct { Error string `json:"error"` } @@ -110,7 +127,10 @@ type ( transactor core.Transactor chainState core.ChainState - metrics *Metrics + metrics *Metrics + disperserHostName string + churnerHostName string + eigenDAServiceChecker EigenDAServiceChecker } ) @@ -123,18 +143,34 @@ func NewServer( chainState core.ChainState, logger common.Logger, metrics *Metrics, + grpcConn GRPCConn, + eigenDAServiceChecker EigenDAServiceChecker, + ) *server { + // Initialize the health checker service for EigenDA services + if grpcConn == nil { + grpcConn = &GRPCDialerSkipTLS{} + } + + if eigenDAServiceChecker == nil { + + eigenDAServiceChecker = NewEigenDAServiceHealthCheck(grpcConn, config.DisperserHostname, config.ChurnerHostname) + } + return &server{ - logger: logger, - serverMode: config.ServerMode, - socketAddr: config.SocketAddr, - allowOrigins: config.AllowOrigins, - blobstore: blobstore, - promClient: promClient, - subgraphClient: subgraphClient, - transactor: transactor, - chainState: chainState, - metrics: metrics, + logger: logger, + serverMode: config.ServerMode, + socketAddr: config.SocketAddr, + allowOrigins: config.AllowOrigins, + blobstore: blobstore, + promClient: promClient, + subgraphClient: subgraphClient, + transactor: transactor, + chainState: chainState, + metrics: metrics, + disperserHostName: config.DisperserHostname, + churnerHostName: config.ChurnerHostname, + eigenDAServiceChecker: eigenDAServiceChecker, } } @@ -160,6 +196,10 @@ func (s *server) Start() error { { operatorsInfo.GET("/deregistered-operators", s.FetchDeregisteredOperators) } + serviceAvailability := v1.Group("/eigenda") + { + serviceAvailability.GET("/service-availability", s.GetEigenDAServiceAvailability) + } metrics := v1.Group("/metrics") { metrics.GET("/", s.FetchMetricsHandler) @@ -204,6 +244,20 @@ func (s *server) Start() error { return <-errChan } +func (s *server) Shutdown() error { + + if s.eigenDAServiceChecker != nil { + err := s.eigenDAServiceChecker.CloseConnections() + + if err != nil { + s.logger.Error("Failed to close connections", "error", err) + return err + } + } + + return nil +} + // FetchBlobHandler godoc // // @Summary Fetch blob metadata by blob key @@ -469,6 +523,69 @@ func (s *server) FetchDeregisteredOperators(c *gin.Context) { }) } +// GetEigenDAServiceAvailability godoc +// +// @Summary Get status of public EigenDA services. +// @Tags ServiceAvailability +// @Produce json +// @Success 200 {object} ServiceAvailabilityResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /eigenda/service-availability [get] +func (s *server) GetEigenDAServiceAvailability(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("GetEigenDAServiceAvailability", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + // Get query parameters to filter services + serviceName := c.DefaultQuery("service-name", "") // If not specified, default to return all services + + // If service name is not specified, return all services + services := []string{} + + if serviceName == "disperser" { + services = append(services, "Disperser") + } else if serviceName == "churner" { + services = append(services, "Churner") + } else if serviceName == "" { + services = append(services, "Disperser", "Churner") + } + s.logger.Info("Getting service availability for", "services", strings.Join(services, ", ")) + + availabilityStatuses, err := s.getServiceAvailability(c.Request.Context(), services) + if err != nil { + s.metrics.IncrementFailedRequestNum("GetEigenDAServiceAvailability") + errorResponse(c, err) + return + } + + s.metrics.IncrementSuccessfulRequestNum("GetEigenDAServiceAvailability") + + // Set the status code to 503 if any of the services are not serving + availabilityStatus := http.StatusOK + for _, status := range availabilityStatuses { + if status.ServiceStatus == "NOT_SERVING" { + availabilityStatus = http.StatusServiceUnavailable + break + } + + if status.ServiceStatus == "UNKNOWN" { + availabilityStatus = http.StatusInternalServerError + break + } + + } + + c.JSON(availabilityStatus, ServiceAvailabilityResponse{ + Meta: Meta{ + Size: len(availabilityStatuses), + }, + Data: availabilityStatuses, + }) +} + func (s *server) getBlobMetadataByBatchesWithLimit(ctx context.Context, limit int) ([]*Batch, []*disperser.BlobMetadata, error) { var ( blobMetadatas = make([]*disperser.BlobMetadata, 0) diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 2cafffb85d..1a5e9f68a8 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -11,6 +11,7 @@ import ( "net" "net/http" "net/http/httptest" + "sync" "testing" "time" @@ -33,6 +34,7 @@ import ( "github.com/stretchr/testify/mock" "go.uber.org/goleak" "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" ) var ( @@ -48,11 +50,12 @@ var ( prometheusClient = dataapi.NewPrometheusClient(mockPrometheusApi, "test-cluster") mockSubgraphApi = &subgraphmock.MockSubgraphApi{} subgraphClient = dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger) - config = dataapi.Config{ServerMode: "test", SocketAddr: ":8080"} + + config = dataapi.Config{ServerMode: "test", SocketAddr: ":8080", AllowOrigins: []string{"*"}, DisperserHostname: "localhost:32007", ChurnerHostname: "localhost:32009"} mockTx = &coremock.MockTransactor{} mockChainState, _ = coremock.MakeChainDataMock(core.OperatorIndex(1)) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) expectedBatchHeaderHash = [32]byte{1, 2, 3} expectedBlobIndex = uint32(1) expectedRequestedAt = uint64(5567830000000000000) @@ -71,6 +74,54 @@ type MockSubgraphClient struct { mock.Mock } +type MockGRPCConnection struct{} + +func (mc *MockGRPCConnection) Dial(serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + // Here, return a mock connection. How you implement this depends on your testing framework + // and what aspects of the gRPC connection you wish to mock. + // For a simple approach, you might not even need to return a real *grpc.ClientConn + // but rather a mock or stub that satisfies the interface. + return &grpc.ClientConn{}, nil // Simplified, consider using a more sophisticated mock. +} + +type MockGRPNilConnection struct{} + +func (mc *MockGRPNilConnection) Dial(serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + // Here, return a mock connection. How you implement this depends on your testing framework + // and what aspects of the gRPC connection you wish to mock. + // For a simple approach, you might not even need to return a real *grpc.ClientConn + // but rather a mock or stub that satisfies the interface. + return nil, nil // Simplified, consider using a more sophisticated mock. +} + +type MockHealthCheckService struct { + ResponseMap map[string]*grpc_health_v1.HealthCheckResponse +} + +func NewMockHealthCheckService() *MockHealthCheckService { + return &MockHealthCheckService{ + ResponseMap: make(map[string]*grpc_health_v1.HealthCheckResponse), + } +} + +func (m *MockHealthCheckService) CheckHealth(ctx context.Context, serviceName string) (*grpc_health_v1.HealthCheckResponse, error) { + response, exists := m.ResponseMap[serviceName] + if !exists { + // Simulate an unsupported service error or return a default response + return nil, fmt.Errorf("unsupported service: %s", serviceName) + } + return response, nil +} + +func (m *MockHealthCheckService) CloseConnections() error { + // Close any open connections or resources + return nil +} + +func (m *MockHealthCheckService) AddResponse(serviceName string, response *grpc_health_v1.HealthCheckResponse) { + m.ResponseMap[serviceName] = response +} + func TestFetchBlobHandler(t *testing.T) { r := setUpRouter() @@ -317,7 +368,7 @@ func TestFetchDeregisteredOperatorNoSocketInfoOneOperatorHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfoNoSocketInfo, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -366,7 +417,7 @@ func TestFetchDeregisteredMultipleOperatorsOneWithNoSocketInfoHandler(t *testing // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfoNoSocketInfo, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -434,7 +485,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -479,7 +530,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampTwoOperatorsHandler(t *tes // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -535,7 +586,7 @@ func TestFetchMetricsDeregisteredOperatorHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -600,7 +651,7 @@ func TestFetchDeregisteredOperatorOffline(t *testing.T) { mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) @@ -653,7 +704,7 @@ func TestFetchDeregisteredOperatorsWithoutDaysQueryParam(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -711,7 +762,7 @@ func TestFetchDeregisteredOperatorInvalidDaysQueryParam(t *testing.T) { mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -753,7 +804,7 @@ func TestFetchDeregisteredOperatorQueryDaysGreaterThan30(t *testing.T) { mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) @@ -799,7 +850,7 @@ func TestFetchDeregisteredOperatorsMultipleOffline(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -857,7 +908,7 @@ func TestFetchDeregisteredOperatorOnline(t *testing.T) { mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) @@ -913,7 +964,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineOnline(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -980,7 +1031,7 @@ func TestFetchDeregisteredOperatorsMultipleOnline(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphTwoOperatorsDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -1055,7 +1106,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineSameBlock(t *testing.T) { mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo3, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -1105,6 +1156,290 @@ func TestFetchDeregisteredOperatorsMultipleOfflineSameBlock(t *testing.T) { mockSubgraphApi.Calls = nil } +func TestGetServiceAvailability(t *testing.T) { + r := setUpRouter() + + mockHealthCheckService := NewMockHealthCheckService() + mockHealthCheckService.AddResponse("Disperser", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }) + // Assuming "Churner" service is also expected to be SERVING for this test + mockHealthCheckService.AddResponse("Churner", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }) + + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/eigenda/service-availability", nil) + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.ServiceAvailabilityResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, http.StatusOK, res.StatusCode) + assert.Equal(t, 2, response.Meta.Size) + assert.Equal(t, 2, len(response.Data)) + + service1Data := response.Data[0] + service2Data := response.Data[1] + + assert.Equal(t, "Disperser", service1Data.ServiceName) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING.String(), service1Data.ServiceStatus) + + assert.Equal(t, "Churner", service2Data.ServiceName) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING.String(), service2Data.ServiceStatus) +} + +func TestGetServiceAvailability_QueryDisperser(t *testing.T) { + r := setUpRouter() + + mockHealthCheckService := NewMockHealthCheckService() + mockHealthCheckService.AddResponse("Disperser", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }) + mockHealthCheckService.AddResponse("Churner", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }) + + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + + // Initialize the gRPC client pools + r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/eigenda/service-availability?service-name=disperser", nil) + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.ServiceAvailabilityResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + fmt.Printf("Response: %v\n", response) + + assert.Equal(t, http.StatusOK, res.StatusCode) + assert.Equal(t, 1, response.Meta.Size) + assert.Equal(t, 1, len(response.Data)) + + serviceData := response.Data[0] + assert.Equal(t, "Disperser", serviceData.ServiceName) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING.String(), serviceData.ServiceStatus) +} + +func TestGetServiceAvailability_QueryInvalidService(t *testing.T) { + r := setUpRouter() + + mockHealthCheckService := NewMockHealthCheckService() + mockHealthCheckService.AddResponse("Disperser", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }) + // Assuming "Churner" service is also expected to be SERVING for this test + mockHealthCheckService.AddResponse("Churner", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + + r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/eigenda/service-availability?service-name=encoder", nil) + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.ServiceAvailabilityResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, http.StatusOK, res.StatusCode) + assert.Equal(t, 0, response.Meta.Size) + assert.Equal(t, 0, len(response.Data)) +} + +func TestGetServiceAvailability_HealthCheckError(t *testing.T) { + r := setUpRouter() + + mockHealthCheckService := NewMockHealthCheckService() + mockHealthCheckService.AddResponse("Disperser", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }) + // Assuming "Churner" service is also expected to be SERVING for this test + mockHealthCheckService.AddResponse("Churner", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + + r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/eigenda/service-availability?service-name=disperser", nil) + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.ServiceAvailabilityResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, http.StatusServiceUnavailable, res.StatusCode) + assert.Equal(t, 1, response.Meta.Size) + assert.Equal(t, 1, len(response.Data)) + + serviceData := response.Data[0] + assert.Equal(t, "Disperser", serviceData.ServiceName) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING.String(), serviceData.ServiceStatus) +} + +func TestGetServiceAvailability_HealthyUnHealthyService(t *testing.T) { + r := setUpRouter() + mockHealthCheckService := NewMockHealthCheckService() + mockHealthCheckService.AddResponse("Disperser", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }) + // Assuming "Churner" service is also expected to be SERVING for this test + mockHealthCheckService.AddResponse("Churner", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + + // Initialize the gRPC client pools + r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/eigenda/service-availability", nil) + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.ServiceAvailabilityResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, http.StatusServiceUnavailable, res.StatusCode) + assert.Equal(t, 2, response.Meta.Size) + assert.Equal(t, 2, len(response.Data)) + + service1Data := response.Data[0] + service2Data := response.Data[1] + + assert.Equal(t, "Disperser", service1Data.ServiceName) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING.String(), service1Data.ServiceStatus) + + assert.Equal(t, "Churner", service2Data.ServiceName) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_NOT_SERVING.String(), service2Data.ServiceStatus) +} + +func TestGetServiceAvailability_QueryDisperser_MultipleRequests(t *testing.T) { + r := setUpRouter() + + mockHealthCheckService := NewMockHealthCheckService() + mockHealthCheckService.AddResponse("Disperser", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }) + // Assuming "Churner" service is also expected to be SERVING for this test + mockHealthCheckService.AddResponse("Churner", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + + r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability) + + var concurrentRequests sync.WaitGroup + responses := make(chan *http.Response, 12) // Channel to collect responses + + for i := 0; i < 12; i++ { + concurrentRequests.Add(1) + go func() { + defer concurrentRequests.Done() + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/eigenda/service-availability?service-name=disperser", nil) + r.ServeHTTP(w, req) + responses <- w.Result() + }() + } + + concurrentRequests.Wait() // Wait for all requests to be processed + close(responses) // Close the channel after all goroutines complete + + // Process responses + for res := range responses { + processResponse(t, res) + } +} + +func TestGetServiceAvailability_HealthCheckerNilConnection(t *testing.T) { + r := setUpRouter() + + mockHealthCheckService := NewMockHealthCheckService() + mockHealthCheckService.AddResponse("Disperser", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }) + mockHealthCheckService.AddResponse("Churner", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }) + + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, &commock.Logger{}), mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPNilConnection{}, nil) + + // Initialize the gRPC client pools + r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/eigenda/service-availability?service-name=disperser", nil) + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.ServiceAvailabilityResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + fmt.Printf("Response: %v\n", response) + + assert.Equal(t, http.StatusInternalServerError, res.StatusCode) + assert.Equal(t, 1, response.Meta.Size) + assert.Equal(t, 1, len(response.Data)) + + serviceData := response.Data[0] + assert.Equal(t, "Disperser", serviceData.ServiceName) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_UNKNOWN.String(), serviceData.ServiceStatus) +} + func setUpRouter() *gin.Engine { return gin.Default() } @@ -1214,3 +1549,25 @@ func getOperatorData(operatorMetadtas []*dataapi.DeregisteredOperatorMetadata, o return dataapi.DeregisteredOperatorMetadata{} } + +// processResponse processes a single http.Response and closes its body. +func processResponse(t *testing.T, res *http.Response) { + defer res.Body.Close() + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.ServiceAvailabilityResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, http.StatusOK, res.StatusCode) + assert.GreaterOrEqual(t, response.Meta.Size, 1) + assert.GreaterOrEqual(t, len(response.Data), 1) + + if len(response.Data) > 0 { + serviceData := response.Data[0] + assert.Equal(t, "Disperser", serviceData.ServiceName) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING.String(), serviceData.ServiceStatus) + } +} diff --git a/disperser/dataapi/service_availability_handler.go b/disperser/dataapi/service_availability_handler.go new file mode 100644 index 0000000000..7a9d9c1f2c --- /dev/null +++ b/disperser/dataapi/service_availability_handler.go @@ -0,0 +1,140 @@ +package dataapi + +import ( + "context" + "crypto/tls" + "fmt" + "strings" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health/grpc_health_v1" +) + +type GRPCConn interface { + Dial(serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) +} + +type GRPCDialerSkipTLS struct{} + +type EigenDAServiceAvailabilityCheck struct { + disperserConn *grpc.ClientConn + churnerConn *grpc.ClientConn +} + +func (s *server) getServiceAvailability(ctx context.Context, services []string) ([]*ServiceAvailability, error) { + if services == nil { + return nil, fmt.Errorf("services cannot be nil") + } + + availabilityStatuses := make([]*ServiceAvailability, len(services)) + + for i, serviceName := range services { + var availabilityStatus *ServiceAvailability + s.logger.Info("checking service health", "service", serviceName) + + response, err := s.eigenDAServiceChecker.CheckHealth(ctx, serviceName) + if err != nil { + + if err.Error() == "disperser connection is nil" { + s.logger.Error("disperser connection is nil") + availabilityStatus = &ServiceAvailability{ + ServiceName: serviceName, + ServiceStatus: grpc_health_v1.HealthCheckResponse_UNKNOWN.String(), + } + availabilityStatuses[i] = availabilityStatus + continue + } + + if err.Error() == "churner connection is nil" { + s.logger.Error("churner connection is nil") + availabilityStatus = &ServiceAvailability{ + ServiceName: serviceName, + ServiceStatus: grpc_health_v1.HealthCheckResponse_UNKNOWN.String(), + } + availabilityStatuses[i] = availabilityStatus + continue + } + + s.logger.Error("failed to check service health", "service", serviceName, "err", err) + availabilityStatus = &ServiceAvailability{ + ServiceName: serviceName, + ServiceStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING.String(), + } + availabilityStatuses[i] = availabilityStatus + } else { + s.logger.Info("service status", "service", serviceName, "status", response.Status.String()) + availabilityStatus = &ServiceAvailability{ + ServiceName: serviceName, + ServiceStatus: response.Status.String(), + } + availabilityStatuses[i] = availabilityStatus + } + } + return availabilityStatuses, nil +} + +func NewEigenDAServiceHealthCheck(grpcConnection GRPCConn, disperserHostName, churnerHostName string) EigenDAServiceChecker { + + // Create Pre-configured connections to the services + // Saves from having to create new connection on each request + disperserConn, err := grpcConnection.Dial(disperserHostName, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})), grpc.WithBlock()) + + if err != nil { + return nil + } + + churnerConn, err := grpcConnection.Dial(churnerHostName, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})), grpc.WithBlock()) + + if err != nil { + return nil + } + + return &EigenDAServiceAvailabilityCheck{ + disperserConn: disperserConn, + churnerConn: churnerConn, + } +} + +// Create Connection to the service +func (rc *GRPCDialerSkipTLS) Dial(serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + return grpc.Dial(serviceName, opts...) +} + +// CheckServiceHealth matches the HealthCheckService interface +func (sac *EigenDAServiceAvailabilityCheck) CheckHealth(ctx context.Context, serviceName string) (*grpc_health_v1.HealthCheckResponse, error) { + serviceName = strings.ToLower(serviceName) // Normalize service name to lower case. + + var client grpc_health_v1.HealthClient + + switch serviceName { + case "disperser": + + if sac.disperserConn == nil { + return nil, fmt.Errorf("disperser connection is nil") + } + client = grpc_health_v1.NewHealthClient(sac.disperserConn) + case "churner": + + if sac.churnerConn == nil { + return nil, fmt.Errorf("churner connection is nil") + } + client = grpc_health_v1.NewHealthClient(sac.churnerConn) + default: + return nil, fmt.Errorf("unsupported service: %s", serviceName) + } + + return client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) +} + +// Close Open connections +func (sac *EigenDAServiceAvailabilityCheck) CloseConnections() error { + if sac.disperserConn != nil { + sac.disperserConn.Close() + } + if sac.churnerConn != nil { + sac.churnerConn.Close() + } + + return nil +}