Skip to content

Commit

Permalink
refactor ejector out of dataapi
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Jun 18, 2024
1 parent 2ade913 commit 3b05c71
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 122 deletions.
3 changes: 2 additions & 1 deletion disperser/cmd/dataapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/dataapi"
"github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus"
"github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph"
"github.com/Layr-Labs/eigenda/operators/ejector"
walletsdk "github.com/Layr-Labs/eigensdk-go/chainio/clients/wallet"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/Layr-Labs/eigensdk-go/signerv2"
Expand Down Expand Up @@ -124,7 +125,7 @@ func RunDataApi(ctx *cli.Context) error {
subgraphClient,
tx,
chainState,
dataapi.NewEjector(wallet, client, logger, tx, metrics, config.TxnTimeout, config.NonsigningRateThreshold),
ejector.NewEjector(wallet, client, logger, tx, metrics.EjectorMetrics, config.TxnTimeout, config.NonsigningRateThreshold),
logger,
metrics,
nil,
Expand Down
101 changes: 8 additions & 93 deletions disperser/dataapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (

"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
"github.com/Layr-Labs/eigenda/operators/ejector"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc/codes"
)

type MetricsConfig struct {
Expand All @@ -23,14 +23,9 @@ type MetricsConfig struct {
type Metrics struct {
registry *prometheus.Registry

NumRequests *prometheus.CounterVec
Latency *prometheus.SummaryVec

PeriodicEjectionRequests *prometheus.CounterVec
UrgentEjectionRequests *prometheus.CounterVec
OperatorsToEject *prometheus.CounterVec
StakeShareToEject *prometheus.GaugeVec
EjectionGasUsed prometheus.Gauge
NumRequests *prometheus.CounterVec
Latency *prometheus.SummaryVec
EjectorMetrics *ejector.Metrics

httpPort string
logger logging.Logger
Expand Down Expand Up @@ -60,58 +55,10 @@ func NewMetrics(blobMetadataStore *blobstore.BlobMetadataStore, httpPort string,
},
[]string{"method"},
),
// PeriodicEjectionRequests is a more detailed metric than NumRequests, specifically for
// tracking the ejection calls that are periodically initiated according to the SLA
// evaluation time window.
PeriodicEjectionRequests: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "periodic_ejection_requests_total",
Help: "the total number of periodic ejection requests",
},
[]string{"status"},
),
// UrgentEjectionRequests is a more detailed metric than NumRequests, specifically for
// tracking the ejection calls that are urgently initiated due to bad network health
// condition.
UrgentEjectionRequests: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "urgent_ejection_requests_total",
Help: "the total number of urgent ejection requests",
},
[]string{"status"},
),
// The number of operators requested to eject. Note this may be different than the
// actual number of operators ejected as EjectionManager contract may perform rate
// limiting.
OperatorsToEject: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "operators_to_eject",
Help: "the total number of operators requested to eject",
}, []string{"quorum"},
),
// The total stake share requested to eject. Note this may be different than the
// actual stake share ejected as EjectionManager contract may perform rate limiting.
StakeShareToEject: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "stake_share_to_eject",
Help: "the total stake share requested to eject",
}, []string{"quorum"},
),
// The gas used by EjectionManager contract for operator ejection.
EjectionGasUsed: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "ejection_gas_used",
Help: "Gas used for operator ejection",
},
),
registry: reg,
httpPort: httpPort,
logger: logger.With("component", "DataAPIMetrics"),
EjectorMetrics: ejector.NewMetrics(reg, logger),
registry: reg,
httpPort: httpPort,
logger: logger.With("component", "DataAPIMetrics"),
}
return metrics
}
Expand All @@ -137,38 +84,6 @@ func (g *Metrics) IncrementFailedRequestNum(method string) {
}).Inc()
}

func (g *Metrics) IncrementEjectionRequest(mode string, status codes.Code) {
switch mode {
case "periodic":
g.PeriodicEjectionRequests.With(prometheus.Labels{
"status": status.String(),
}).Inc()
case "urgent":
g.UrgentEjectionRequests.With(prometheus.Labels{
"status": status.String(),
}).Inc()
}
}

func (g *Metrics) UpdateRequestedOperatorMetric(numOperatorsByQuorum map[uint8]int, stakeShareByQuorum map[uint8]float64) {
for q, count := range numOperatorsByQuorum {
for i := 0; i < count; i++ {
g.OperatorsToEject.With(prometheus.Labels{
"quorum": fmt.Sprintf("%d", q),
}).Inc()
}
}
for q, stakeShare := range stakeShareByQuorum {
g.StakeShareToEject.With(prometheus.Labels{
"quorum": fmt.Sprintf("%d", q),
}).Set(stakeShare)
}
}

func (g *Metrics) UpdateEjectionGasUsed(gasUsed uint64) {
g.EjectionGasUsed.Set(float64(gasUsed))
}

// IncrementNotFoundRequestNum increments the number of not found requests
func (g *Metrics) IncrementNotFoundRequestNum(method string) {
g.NumRequests.With(prometheus.Labels{
Expand Down
34 changes: 19 additions & 15 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/operators/ejector"
"github.com/Layr-Labs/eigensdk-go/logging"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/Layr-Labs/eigenda/disperser"
Expand Down Expand Up @@ -94,10 +94,6 @@ type (
Timestamp uint64 `json:"timestamp"`
}

EjectionResponse struct {
TransactionHash string `json:"transaction_hash"`
}

Meta struct {
Size int `json:"size"`
}
Expand Down Expand Up @@ -170,7 +166,7 @@ type (
subgraphClient SubgraphClient
transactor core.Transactor
chainState core.ChainState
ejector *Ejector
ejector *ejector.Ejector
ejectionToken string

metrics *Metrics
Expand All @@ -189,7 +185,7 @@ func NewServer(
subgraphClient SubgraphClient,
transactor core.Transactor,
chainState core.ChainState,
ejector *Ejector,
ejector *ejector.Ejector,
logger logging.Logger,
metrics *Metrics,
grpcConn GRPCConn,
Expand Down Expand Up @@ -344,9 +340,9 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) {
return
}

mode := "periodic"
if c.Query("mode") != "" {
mode = c.Query("mode")
mode := ejector.PeriodicMode
if c.Query("mode") == "urgent" {
mode = ejector.UrgentMode
}

endTime := time.Now()
Expand All @@ -355,7 +351,6 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) {
endTime, err = time.Parse("2006-01-02T15:04:05Z", c.Query("end"))
if err != nil {
s.metrics.IncrementFailedRequestNum("EjectOperators")
s.metrics.IncrementEjectionRequest(mode, codes.InvalidArgument)
errorResponse(c, err)
return
}
Expand All @@ -367,18 +362,27 @@ func (s *server) EjectOperatorsHandler(c *gin.Context) {
}

nonSigningRate, err := s.getOperatorNonsigningRate(c.Request.Context(), endTime.Unix()-interval, endTime.Unix(), true)
var ejectionResponse *EjectionResponse
var ejectionResponse *ejector.EjectionResponse
if err == nil {
ejectionResponse, err = s.ejector.Eject(c.Request.Context(), nonSigningRate)
nonSigningMetrics := make([]*ejector.NonSignerMetric, 0)
for _, metric := range nonSigningRate.Data {
nonSigningMetrics = append(nonSigningMetrics, &ejector.NonSignerMetric{
OperatorId: metric.OperatorId,
OperatorAddress: metric.OperatorAddress,
QuorumId: metric.QuorumId,
TotalUnsignedBatches: metric.TotalUnsignedBatches,
Percentage: metric.Percentage,
StakePercentage: metric.StakePercentage,
})
}
ejectionResponse, err = s.ejector.Eject(c.Request.Context(), nonSigningMetrics, mode)
}
if err != nil {
s.metrics.IncrementFailedRequestNum("EjectOperators")
s.metrics.IncrementEjectionRequest(mode, codes.Internal)
errorResponse(c, err)
return
}
s.metrics.IncrementSuccessfulRequestNum("EjectOperators")
s.metrics.IncrementEjectionRequest(mode, codes.OK)
c.JSON(http.StatusOK, ejectionResponse)
}

Expand Down
10 changes: 7 additions & 3 deletions disperser/dataapi/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (
"github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph"
subgraphmock "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph/mock"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/operators/ejector"
sdkmock "github.com/Layr-Labs/eigensdk-go/chainio/clients/mocks"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/consensys/gnark-crypto/ecc/bn254/fp"
"github.com/ethereum/go-ethereum/common"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/gin-gonic/gin"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand All @@ -58,6 +60,7 @@ var (
config = dataapi.Config{ServerMode: "test", SocketAddr: ":8080", AllowOrigins: []string{"*"}, DisperserHostname: "localhost:32007", ChurnerHostname: "localhost:32009", EjectionToken: "deadbeef"}

mockTx = &coremock.MockTransactor{}
reg = prom.NewRegistry()
metrics = dataapi.NewMetrics(nil, "9001", mockLogger)
opId0, _ = core.OperatorIDFromHex("e22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311")
opId1, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312")
Expand Down Expand Up @@ -386,7 +389,7 @@ func TestEjectOperatorHandler(t *testing.T) {
data, err := io.ReadAll(res.Body)
assert.NoError(t, err)

var response dataapi.EjectionResponse
var response ejector.EjectionResponse
err = json.Unmarshal(data, &response)
assert.NoError(t, err)
assert.NotNil(t, response)
Expand Down Expand Up @@ -455,14 +458,15 @@ func TestFetchUnsignedBatchesHandler(t *testing.T) {
type ejectorComponents struct {
wallet *sdkmock.MockWallet
ethClient *commonmock.MockEthClient
ejector *dataapi.Ejector
ejector *ejector.Ejector
}

func getEjector(t *testing.T) *ejectorComponents {
ctrl := gomock.NewController(t)
w := sdkmock.NewMockWallet(ctrl)
ethClient := &commonmock.MockEthClient{}
ejector := dataapi.NewEjector(w, ethClient, mockLogger, mockTx, metrics, 100*time.Millisecond, -1)
m := ejector.NewMetrics(reg, mockLogger)
ejector := ejector.NewEjector(w, ethClient, mockLogger, mockTx, m, 100*time.Millisecond, -1)
return &ejectorComponents{
wallet: w,
ethClient: ethClient,
Expand Down
Loading

0 comments on commit 3b05c71

Please sign in to comment.