Skip to content

Commit

Permalink
Get a list of operators which exited the system (either due to opt-ou…
Browse files Browse the repository at this point in the history
…t, churn, or booting) within the last 14 days. (#131)

Co-authored-by: Wellington Barbosa <[email protected]>
  • Loading branch information
wmb-software-consulting and Wellington Barbosa authored Dec 20, 2023
1 parent 49615e4 commit 7f60e15
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 21 deletions.
12 changes: 6 additions & 6 deletions core/thegraph/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
)

const (
defaultInterval = time.Second
maxInterval = 5 * time.Minute
MAX_ENTRIES_PER_QUERY = 1000
startRetriesInterval = time.Second * 5
startMaxRetries = 6
defaultInterval = time.Second
maxInterval = 5 * time.Minute
maxEntriesPerQuery = 1000
startRetriesInterval = time.Second * 5
startMaxRetries = 6
)

type (
Expand Down Expand Up @@ -235,7 +235,7 @@ func (ics *indexedChainState) getAllOperatorsRegisteredAtBlockNumberWithPaginati
var (
query QueryOperatorsGql
variables = map[string]any{
"first": graphql.Int(MAX_ENTRIES_PER_QUERY),
"first": graphql.Int(maxEntriesPerQuery),
"skip": graphql.Int(len(operators)), // skip is the number of operators already retrieved
"blockNumber": graphql.Int(blockNumber),
}
Expand Down
46 changes: 39 additions & 7 deletions disperser/dataapi/subgraph/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,28 @@ package subgraph

import (
"context"
"encoding/hex"
"fmt"
"sync"
"time"

"github.com/Layr-Labs/eigenda/core"
"github.com/shurcooL/graphql"
)

var (
once sync.Once
instance *api
MAX_ENTITIES_PER_QUERY = 1000
once sync.Once
instance *api
maxEntriesPerQuery = 1000
)

type (
Api interface {
QueryBatches(ctx context.Context, descending bool, orderByField string, first, skip int) ([]*Batches, error)
QueryOperators(ctx context.Context, first int) ([]*OperatorRegistered, error)
QueryOperators(ctx context.Context, first int) ([]*Operator, error)
QueryBatchNonSigningOperatorIdsInInterval(ctx context.Context, intervalSeconds int64) ([]*BatchNonSigningOperatorIds, error)
QueryDeregisteredOperatorsGreaterThanBlockTimestamp(ctx context.Context, blockTimestamp uint64) ([]*Operator, error)
QueryOperatorInfoByOperatorIdAtBlockNumber(ctx context.Context, operatorId core.OperatorID, blockNumber uint32) (*IndexedOperatorInfo, error)
}

api struct {
Expand Down Expand Up @@ -61,7 +66,7 @@ func (a *api) QueryBatches(ctx context.Context, descending bool, orderByField st
return result.Batches, nil
}

func (a *api) QueryOperators(ctx context.Context, first int) ([]*OperatorRegistered, error) {
func (a *api) QueryOperators(ctx context.Context, first int) ([]*Operator, error) {
variables := map[string]any{
"first": graphql.Int(first),
}
Expand All @@ -84,7 +89,7 @@ func (a *api) QueryBatchNonSigningOperatorIdsInInterval(ctx context.Context, int
result := new(queryBatchNonSigningOperatorIdsInInterval)
batchNonSigningOperatorIds := make([]*BatchNonSigningOperatorIds, 0)
for {
variables["first"] = graphql.Int(MAX_ENTITIES_PER_QUERY)
variables["first"] = graphql.Int(maxEntriesPerQuery)
variables["skip"] = graphql.Int(skip)

err := a.uiMonitoringGgl.Query(ctx, &result, variables)
Expand All @@ -97,9 +102,36 @@ func (a *api) QueryBatchNonSigningOperatorIdsInInterval(ctx context.Context, int
}
batchNonSigningOperatorIds = append(batchNonSigningOperatorIds, result.BatchNonSigningOperatorIds...)

skip += MAX_ENTITIES_PER_QUERY
skip += maxEntriesPerQuery
}

result.BatchNonSigningOperatorIds = batchNonSigningOperatorIds
return result.BatchNonSigningOperatorIds, nil
}

func (a *api) QueryDeregisteredOperatorsGreaterThanBlockTimestamp(ctx context.Context, blockTimestamp uint64) ([]*Operator, error) {
variables := map[string]any{
"blockTimestamp_gt": graphql.Int(blockTimestamp),
}
query := new(queryOperatorDeregistereds)
err := a.operatorStateGql.Query(ctx, &query, variables)
if err != nil {
return nil, err
}
return query.OperatorDeregistereds, nil
}

func (a *api) QueryOperatorInfoByOperatorIdAtBlockNumber(ctx context.Context, operatorId core.OperatorID, blockNumber uint32) (*IndexedOperatorInfo, error) {
var (
query queryOperatorById
variables = map[string]any{
"id": graphql.String(fmt.Sprintf("0x%s", hex.EncodeToString(operatorId[:]))),
}
)
err := a.operatorStateGql.Query(context.Background(), &query, variables)
if err != nil {
return nil, err
}

return &query.Operator, nil
}
29 changes: 26 additions & 3 deletions disperser/dataapi/subgraph/mock/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"slices"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -41,12 +42,12 @@ func (m *MockSubgraphApi) QueryBatches(ctx context.Context, descending bool, ord
return value, args.Error(1)
}

func (m *MockSubgraphApi) QueryOperators(ctx context.Context, first int) ([]*subgraph.OperatorRegistered, error) {
func (m *MockSubgraphApi) QueryOperators(ctx context.Context, first int) ([]*subgraph.Operator, error) {
args := m.Called()

var value []*subgraph.OperatorRegistered
var value []*subgraph.Operator
if args.Get(0) != nil {
value = args.Get(0).([]*subgraph.OperatorRegistered)
value = args.Get(0).([]*subgraph.Operator)

if len(value) > first {
value = value[:first]
Expand All @@ -70,3 +71,25 @@ func (m *MockSubgraphApi) QueryBatchNonSigningOperatorIdsInInterval(ctx context.

return value, args.Error(1)
}

func (m *MockSubgraphApi) QueryDeregisteredOperatorsGreaterThanBlockTimestamp(ctx context.Context, blockTimestamp uint64) ([]*subgraph.Operator, error) {
args := m.Called()

var value []*subgraph.Operator
if args.Get(0) != nil {
value = args.Get(0).([]*subgraph.Operator)
}

return value, args.Error(1)
}

func (m *MockSubgraphApi) QueryOperatorInfoByOperatorIdAtBlockNumber(ctx context.Context, operatorId core.OperatorID, blockNumber uint32) (*subgraph.IndexedOperatorInfo, error) {
args := m.Called()

var value *subgraph.IndexedOperatorInfo
if args.Get(0) != nil {
value = args.Get(0).(*subgraph.IndexedOperatorInfo)
}

return value, args.Error(1)
}
26 changes: 23 additions & 3 deletions disperser/dataapi/subgraph/queries.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package subgraph

import "github.com/shurcooL/graphql"
import (
"github.com/shurcooL/graphql"
)

type (
Batches struct {
Expand All @@ -18,7 +20,7 @@ type (
GasPrice graphql.String
TxFee graphql.String
}
OperatorRegistered struct {
Operator struct {
Id graphql.String
OperatorId graphql.String
Operator graphql.String
Expand All @@ -33,13 +35,31 @@ type (
} `graphql:"nonSigners"`
} `graphql:"nonSigning"`
}
SocketUpdates struct {
Socket graphql.String
}
IndexedOperatorInfo struct {
Id graphql.String
PubkeyG1_X graphql.String `graphql:"pubkeyG1_X"`
PubkeyG1_Y graphql.String `graphql:"pubkeyG1_Y"`
PubkeyG2_X []graphql.String `graphql:"pubkeyG2_X"`
PubkeyG2_Y []graphql.String `graphql:"pubkeyG2_Y"`
// Socket is the socket address of the operator, in the form "host:port"
SocketUpdates []SocketUpdates `graphql:"socketUpdates(first: 1, orderBy: blockNumber, orderDirection: desc)"`
}
queryBatches struct {
Batches []*Batches `graphql:"batches(orderDirection: $orderDirection, orderBy: $orderBy, first: $first, skip: $skip)"`
}
queryOperatorRegistereds struct {
OperatorRegistereds []*OperatorRegistered `graphql:"operatorRegistereds(first: $first)"`
OperatorRegistereds []*Operator `graphql:"operatorRegistereds(first: $first)"`
}
queryBatchNonSigningOperatorIdsInInterval struct {
BatchNonSigningOperatorIds []*BatchNonSigningOperatorIds `graphql:"batches(first: $first, skip: $skip, where: {blockTimestamp_gt: $blockTimestamp_gt})"`
}
queryOperatorDeregistereds struct {
OperatorDeregistereds []*Operator `graphql:"operatorDeregistereds(where: {blockTimestamp_gt: $blockTimestamp_gt})"`
}
queryOperatorById struct {
Operator IndexedOperatorInfo `graphql:"operator(id: $id)"`
}
)
53 changes: 52 additions & 1 deletion disperser/dataapi/subgraph_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@ package dataapi
import (
"context"
"strconv"
"time"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph"
)

const _14Days = 14 * 24 * time.Hour

type (
SubgraphClient interface {
QueryBatchesWithLimit(ctx context.Context, limit, skip int) ([]*Batch, error)
QueryOperatorsWithLimit(ctx context.Context, limit int) ([]*Operator, error)
QueryBatchNonSigningOperatorIdsInInterval(ctx context.Context, intervalSeconds int64) (map[string]int, error)
QueryIndexedDeregisteredOperatorsInTheLast14Days(ctx context.Context) (*IndexedDeregisteredOperatorState, error)
}
Batch struct {
Id []byte
Expand All @@ -36,6 +41,14 @@ type (
BlockNumber uint64
TransactionHash []byte
}
DeregisteredOperatorInfo struct {
*core.IndexedOperatorInfo
// BlockNumber is the block number at which the operator was deregistered.
BlockNumber uint
}
IndexedDeregisteredOperatorState struct {
Operators map[core.OperatorID]*DeregisteredOperatorInfo
}
subgraphClient struct {
api subgraph.Api
}
Expand Down Expand Up @@ -93,6 +106,44 @@ func (sc *subgraphClient) QueryBatchNonSigningOperatorIdsInInterval(ctx context.
return batchNonSigningOperatorIds, nil
}

func (sc *subgraphClient) QueryIndexedDeregisteredOperatorsInTheLast14Days(ctx context.Context) (*IndexedDeregisteredOperatorState, error) {
last14Days := uint64(time.Now().Add(-_14Days).Unix())
deregisteredOperators, err := sc.api.QueryDeregisteredOperatorsGreaterThanBlockTimestamp(ctx, last14Days)
if err != nil {
return nil, err
}

operators := make(map[core.OperatorID]*DeregisteredOperatorInfo, len(deregisteredOperators))
for i := range deregisteredOperators {
deregisteredOperator := deregisteredOperators[i]
operator, err := convertOperator(deregisteredOperator)
if err != nil {
return nil, err
}

var operatorId [32]byte
copy(operatorId[:], operator.OperatorId)

operatorInfo, err := sc.api.QueryOperatorInfoByOperatorIdAtBlockNumber(ctx, operatorId, uint32(operator.BlockNumber))
if err != nil {
return nil, err
}
indexedOperatorInfo, err := ConvertOperatorInfoGqlToIndexedOperatorInfo(operatorInfo)
if err != nil {
return nil, err
}

operators[operatorId] = &DeregisteredOperatorInfo{
IndexedOperatorInfo: indexedOperatorInfo,
BlockNumber: uint(operator.BlockNumber),
}
}

return &IndexedDeregisteredOperatorState{
Operators: operators,
}, nil
}

func convertBatches(subgraphBatches []*subgraph.Batches) ([]*Batch, error) {
batches := make([]*Batch, len(subgraphBatches))
for i, batch := range subgraphBatches {
Expand Down Expand Up @@ -147,7 +198,7 @@ func convertGasFees(gasFees subgraph.GasFees) (*GasFees, error) {
}, nil
}

func convertOperator(operator *subgraph.OperatorRegistered) (*Operator, error) {
func convertOperator(operator *subgraph.Operator) (*Operator, error) {
timestamp, err := strconv.ParseUint(string(operator.BlockTimestamp), 10, 64)
if err != nil {
return nil, err
Expand Down
59 changes: 58 additions & 1 deletion disperser/dataapi/subgraph_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"github.com/Layr-Labs/eigenda/disperser/dataapi"
"github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph"
subgraphmock "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph/mock"
"github.com/shurcooL/graphql"
"github.com/stretchr/testify/assert"
)

var (
subgraphOperatorRegistereds = []*subgraph.OperatorRegistered{
subgraphOperatorRegistereds = []*subgraph.Operator{
{
Id: "0x000763fb86a79eda47c891d8826474d80b6a935ad2a2b5de921933e05c67f320f211",
OperatorId: "0xe1cdae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311",
Expand All @@ -30,6 +31,17 @@ var (
},
}

subgraphOperatorDeregistereds = []*subgraph.Operator{
{
Id: "0x000763fb86a79eda47c891d8826474d80b6a935ad2a2b5de921933e05c67f320f222",
OperatorId: "0xe22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311",
Operator: "0x000223fb86a79eda47c891d8826474d80b6a935ad2a2b5de921933e05c67f320f211",
BlockTimestamp: "1702666046",
BlockNumber: "22",
TransactionHash: "0x000223fb86a79eda47c891d8826474d80b6a935ad2a2b5de921933e05c67f320f211",
},
}

subgraphBatches = []*subgraph.Batches{
{
Id: "0x000763fb86a79eda47c891d8826474d80b6a935ad2a2b5de921933e05c67f320f207",
Expand Down Expand Up @@ -74,6 +86,25 @@ var (
},
},
}

subgraphIndexedOperatorInfos = &subgraph.IndexedOperatorInfo{
Id: "0x000223fb86a79eda47c891d8826474d80b6a935ad2a2b5de921933e05c67f320f222",
PubkeyG1_X: "3336192159512049190945679273141887248666932624338963482128432381981287252980",
PubkeyG1_Y: "15195175002875833468883745675063986308012687914999552116603423331534089122704",
PubkeyG2_X: []graphql.String{
"21597023645215426396093421944506635812143308313031252511177204078669540440732",
"11405255666568400552575831267661419473985517916677491029848981743882451844775",
},
PubkeyG2_Y: []graphql.String{
"9416989242565286095121881312760798075882411191579108217086927390793923664442",
"13612061731370453436662267863740141021994163834412349567410746669651828926551",
},
SocketUpdates: []subgraph.SocketUpdates{
{
Socket: "localhost:32006;32007",
},
},
}
)

func TestQueryBatchesWithLimit(t *testing.T) {
Expand Down Expand Up @@ -128,6 +159,32 @@ func TestQueryOperators(t *testing.T) {
assert.Equal(t, []byte("0x000163fb86a79eda47c891d8826474d80b6a935ad2a2b5de921933e05c67f320f212"), operators[1].TransactionHash)
}

func TestQueryIndexedDeregisteredOperatorsInTheLast14Days(t *testing.T) {
mockSubgraphApi := &subgraphmock.MockSubgraphApi{}
mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil)
mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfos, nil)
subgraphClient := dataapi.NewSubgraphClient(mockSubgraphApi)
indexedDeregisteredOperatorState, err := subgraphClient.QueryIndexedDeregisteredOperatorsInTheLast14Days(context.Background())
assert.NoError(t, err)

operators := indexedDeregisteredOperatorState.Operators
assert.Equal(t, 1, len(operators))

var operatorId [32]byte
copy(operatorId[:], []byte("0xe22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311"))
operator := operators[operatorId]

assert.NotNil(t, operator)

expectedIndexedOperatorInfo, err := dataapi.ConvertOperatorInfoGqlToIndexedOperatorInfo(subgraphIndexedOperatorInfos)
assert.NoError(t, err)

assert.Equal(t, expectedIndexedOperatorInfo.PubkeyG1, operator.PubkeyG1)
assert.Equal(t, expectedIndexedOperatorInfo.PubkeyG2, operator.PubkeyG2)
assert.Equal(t, "localhost:32006;32007", operator.Socket)
assert.Equal(t, uint64(22), uint64(operator.BlockNumber))
}

func assertGasFees(t *testing.T, gasFees *dataapi.GasFees) {
assert.NotNil(t, gasFees)
assert.Equal(t, []byte("0x0006afd9ce41ba0f3414ba2650a9cd2f47c0e22af21651f7fd902f71df678c5d9942"), gasFees.Id)
Expand Down
Loading

0 comments on commit 7f60e15

Please sign in to comment.