Skip to content

Commit

Permalink
add component context / refactor logging
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Mar 29, 2024
1 parent e85c623 commit 15f3ee3
Show file tree
Hide file tree
Showing 39 changed files with 93 additions and 93 deletions.
2 changes: 1 addition & 1 deletion clients/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewRetrievalClient(
) (*retrievalClient, error) {

return &retrievalClient{
logger: logger,
logger: logger.With("component", "RetrievalClient"),
indexedChainState: chainState,
assignmentCoordinator: assignmentCoordinator,
nodeClient: nodeClient,
Expand Down
2 changes: 1 addition & 1 deletion common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func NewClient(cfg commonaws.ClientConfig, logger logging.Logger) (*Client, erro
return
}
dynamoClient := dynamodb.NewFromConfig(awsConfig)
clientRef = &Client{dynamoClient: dynamoClient, logger: logger}
clientRef = &Client{dynamoClient: dynamoClient, logger: logger.With("component", "dynamodb.Client")}
})
return clientRef, err
}
Expand Down
2 changes: 1 addition & 1 deletion common/aws/s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewClient(ctx context.Context, cfg commonaws.ClientConfig, logger logging.L
s3Client := s3.NewFromConfig(awsConfig, func(o *s3.Options) {
o.UsePathStyle = true
})
ref = &client{s3Client: s3Client, logger: logger}
ref = &client{s3Client: s3Client, logger: logger.With("component", "s3.Client")}
})
return ref, err
}
Expand Down
11 changes: 6 additions & 5 deletions common/geth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ var _ common.EthClient = (*EthClient)(nil)
// NewClient creates a new Ethereum client.
// If PrivateKeyString in the config is empty, the client will not be able to send transactions, and it will use the senderAddress to create transactions.
// If PrivateKeyString in the config is not empty, the client will be able to send transactions, and the senderAddress is ignored.
func NewClient(config EthClientConfig, senderAddress gethcommon.Address, rpcIndex int, logger logging.Logger) (*EthClient, error) {
func NewClient(config EthClientConfig, senderAddress gethcommon.Address, rpcIndex int, _logger logging.Logger) (*EthClient, error) {
if rpcIndex >= len(config.RPCURLs) {
return nil, fmt.Errorf("NewClient: index out of bound, array size is %v, requested is %v", len(config.RPCURLs), rpcIndex)
}
logger := _logger.With("component", "EthClient")

rpcUrl := config.RPCURLs[rpcIndex]
chainClient, err := ethclient.Dial(rpcUrl)
Expand All @@ -63,7 +64,7 @@ func NewClient(config EthClientConfig, senderAddress gethcommon.Address, rpcInde
publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)

if !ok {
logger.Error("NewClient: cannot get publicKeyECDSA")
logger.Error("cannot get publicKeyECDSA")
return nil, ErrCannotGetECDSAPubKey
}
accountAddress = crypto.PubkeyToAddress(*publicKeyECDSA)
Expand Down Expand Up @@ -224,7 +225,7 @@ func (c *EthClient) EstimateGasPriceAndLimitAndSendTx(
func (c *EthClient) EnsureTransactionEvaled(ctx context.Context, tx *types.Transaction, tag string) (*types.Receipt, error) {
receipt, err := c.waitMined(ctx, []*types.Transaction{tx})
if err != nil {
return receipt, fmt.Errorf("EnsureTransactionEvaled: failed to wait for transaction (%s) to mine: %w", tag, err)
return receipt, fmt.Errorf("failed to wait for transaction (%s) to mine: %w", tag, err)
}
if receipt.Status != 1 {
c.Logger.Error("Transaction Failed", "tag", tag, "txHash", tx.Hash().Hex(), "status", receipt.Status, "GasUsed", receipt.GasUsed)
Expand Down Expand Up @@ -265,13 +266,13 @@ func (c *EthClient) waitMined(ctx context.Context, txs []*types.Transaction) (*t
chainTip, err := c.BlockNumber(ctx)
if err == nil {
if receipt.BlockNumber.Uint64()+uint64(c.numConfirmations) > chainTip {
c.Logger.Debug("EnsureTransactionEvaled: transaction has been mined but don't have enough confirmations at current chain tip", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", c.numConfirmations, "chainTip", chainTip)
c.Logger.Debug("transaction has been mined but don't have enough confirmations at current chain tip", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", c.numConfirmations, "chainTip", chainTip)
break
} else {
return receipt, nil
}
} else {
c.Logger.Debug("EnsureTransactionEvaled: failed to get chain tip while waiting for transaction to mine", "err", err)
c.Logger.Debug("failed to get chain tip while waiting for transaction to mine", "err", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion common/geth/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type FailoverController struct {

func NewFailoverController(logger logging.Logger) *FailoverController {
return &FailoverController{
Logger: logger,
Logger: logger.With("component", "FailoverController"),
mu: &sync.RWMutex{},
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/geth/multihoming_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewMultiHomingClient(config EthClientConfig, senderAddress gethcommon.Addre
NumRetries: config.NumRetries,
FailoverController: FailoverController,
lastRPCIndex: 0,
Logger: logger,
Logger: logger.With("component", "MultiHomingClient"),
mu: sync.Mutex{},
}

Expand Down
2 changes: 1 addition & 1 deletion common/ratelimit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewRateLimiter(rateParams common.GlobalRateParams, bucketStore BucketStore,
return &rateLimiter{
globalRateParams: rateParams,
bucketStore: bucketStore,
logger: logger,
logger: logger.With("component", "RateLimiter"),
}
}

Expand Down
12 changes: 6 additions & 6 deletions core/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewStdSignatureAggregator(logger logging.Logger, transactor Transactor) (*S
}

return &StdSignatureAggregator{
Logger: logger,
Logger: logger.With("component", "SignatureAggregator"),
Transactor: transactor,
OperatorAddresses: operatorAddrs,
}, nil
Expand Down Expand Up @@ -107,7 +107,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state
if !ok && a.Transactor != nil {
operatorAddr, err = a.Transactor.OperatorIDToAddress(ctx, r.Operator)
if err != nil {
a.Logger.Error("Failed to get operator address from registry", "operatorID", operatorIDHex)
a.Logger.Error("failed to get operator address from registry", "operatorID", operatorIDHex)
operatorAddr = gethcommon.Address{}
} else {
a.OperatorAddresses.Add(r.Operator, operatorAddr)
Expand All @@ -121,7 +121,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state
socket = op.Socket
}
if r.Err != nil {
a.Logger.Warn("[AggregateSignatures] error returned from messageChan", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "err", r.Err)
a.Logger.Warn("error returned from messageChan", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "err", r.Err)
continue
}

Expand All @@ -135,11 +135,11 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state
sig := r.Signature
ok = sig.Verify(op.PubkeyG2, message)
if !ok {
a.Logger.Error("Signature is not valid", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "pubkey", hexutil.Encode(op.PubkeyG2.Serialize()))
a.Logger.Error("signature is not valid", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "pubkey", hexutil.Encode(op.PubkeyG2.Serialize()))
continue
}

a.Logger.Info("[AggregateSignatures] received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket)
a.Logger.Info("received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket)

for ind, id := range quorumIDs {

Expand All @@ -149,7 +149,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state

// If operator is not in quorum, skip
if !ok {
a.Logger.Error("Operator not found in quorum", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumID", id)
a.Logger.Error("operator not found in quorum", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumID", id)
continue
}

Expand Down
6 changes: 3 additions & 3 deletions core/eth/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewTransactor(

e := &Transactor{
EthClient: client,
Logger: logger,
Logger: logger.With("component", "Transactor"),
}

blsOperatorStateRetrieverAddr := gethcommon.HexToAddress(blsOperatorStateRetrieverHexAddr)
Expand Down Expand Up @@ -490,7 +490,7 @@ func (t *Transactor) BuildConfirmBatchTxn(ctx context.Context, batchHeader *core
SignedStakeForQuorums: signedStakeForQuorums,
ReferenceBlockNumber: uint32(batchHeader.ReferenceBlockNumber),
}
t.Logger.Debug("[ConfirmBatch] batch header", "batchHeaderReferenceBlock", batchH.ReferenceBlockNumber, "batchHeaderRoot", gethcommon.Bytes2Hex(batchH.BlobHeadersRoot[:]), "quorumNumbers", gethcommon.Bytes2Hex(batchH.QuorumNumbers), "quorumThresholdPercentages", gethcommon.Bytes2Hex(batchH.SignedStakeForQuorums))
t.Logger.Debug("batch header", "batchHeaderReferenceBlock", batchH.ReferenceBlockNumber, "batchHeaderRoot", gethcommon.Bytes2Hex(batchH.BlobHeadersRoot[:]), "quorumNumbers", gethcommon.Bytes2Hex(batchH.QuorumNumbers), "quorumThresholdPercentages", gethcommon.Bytes2Hex(batchH.SignedStakeForQuorums))

sigma := signatureToBN254G1Point(signatureAggregation.AggSignature)

Expand All @@ -513,7 +513,7 @@ func (t *Transactor) BuildConfirmBatchTxn(ctx context.Context, batchHeader *core
}
sigChecker, err := json.Marshal(signatureChecker)
if err == nil {
t.Logger.Debug("[ConfirmBatch] signature checker", "signatureChecker", string(sigChecker))
t.Logger.Debug("signature checker", "signatureChecker", string(sigChecker))
}

opts, err := t.EthClient.GetNoSendTransactOpts()
Expand Down
4 changes: 2 additions & 2 deletions core/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ func CreateNewIndexer(
gethClient dacommon.EthClient,
rpcClient dacommon.RPCEthClient,
eigenDAServiceManagerAddr string,
logger logging.Logger,
_logger logging.Logger,
) (indexer.Indexer, error) {

logger := _logger.With("component", "Indexer")
eigenDAServiceManager := common.HexToAddress(eigenDAServiceManagerAddr)

pubKeyFilterer, err := NewOperatorPubKeysFilterer(eigenDAServiceManager, gethClient)
Expand Down
2 changes: 1 addition & 1 deletion core/thegraph/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewIndexedChainState(cs core.ChainState, querier GraphQLQuerier, logger log
return &indexedChainState{
ChainState: cs,
querier: querier,
logger: logger,
logger: logger.With("component", "IndexedChainState"),
}
}

Expand Down
3 changes: 2 additions & 1 deletion disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ func NewDispersalServer(
serverConfig disperser.ServerConfig,
store disperser.BlobStore,
tx core.Transactor,
logger logging.Logger,
_logger logging.Logger,
metrics *disperser.Metrics,
ratelimiter common.RateLimiter,
rateConfig RateConfig,
) *DispersalServer {
logger := _logger.With("component", "DispersalServer")
for ip, rateInfoByQuorum := range rateConfig.Allowlist {
for quorumID, rateInfo := range rateInfoByQuorum {
logger.Info("[Allowlist]", "ip", ip, "quorumID", quorumID, "throughput", rateInfo.Throughput, "blobRate", rateInfo.BlobRate)
Expand Down
20 changes: 10 additions & 10 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func NewBatcher(

ethClient: ethClient,
finalizer: finalizer,
logger: logger,
logger: logger.With("component", "Batcher"),
HeartbeatChan: heartbeatChan,
}, nil
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func (b *Batcher) ProcessConfirmedBatch(ctx context.Context, receiptOrErr *Recei
b.logger.Error("failed to update confirmation info", "failed", len(blobsToRetry), "total", len(blobs))
_ = b.handleFailure(ctx, blobsToRetry, FailUpdateConfirmationInfo)
}
b.logger.Debug("[batcher] Update confirmation info took", "duration", time.Since(stageTimer))
b.logger.Debug("Update confirmation info took", "duration", time.Since(stageTimer))
b.Metrics.ObserveLatency("UpdateConfirmationInfo", float64(time.Since(stageTimer).Milliseconds()))
batchSize := int64(0)
for _, blobMeta := range blobs {
Expand Down Expand Up @@ -403,24 +403,24 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
if err != nil {
return err
}
log.Debug("[batcher] CreateBatch took", "duration", time.Since(stageTimer))
log.Debug("CreateBatch took", "duration", time.Since(stageTimer))

// Dispatch encoded batch
log.Debug("[batcher] Dispatching encoded batch...")
log.Debug("Dispatching encoded batch...")
stageTimer = time.Now()
update := b.Dispatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader)
log.Debug("[batcher] DisperseBatch took", "duration", time.Since(stageTimer))
log.Debug("DisperseBatch took", "duration", time.Since(stageTimer))

// Get the batch header hash
log.Debug("[batcher] Getting batch header hash...")
log.Debug("Getting batch header hash...")
headerHash, err := batch.BatchHeader.GetBatchHeaderHash()
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailBatchHeaderHash)
return fmt.Errorf("HandleSingleBatch: error getting batch header hash: %w", err)
}

// Aggregate the signatures
log.Debug("[batcher] Aggregating signatures...")
log.Debug("Aggregating signatures...")

// construct quorumParams
quorumIDs := make([]core.QuorumID, 0, len(batch.State.AggKeys))
Expand All @@ -435,11 +435,11 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailAggregateSignatures)
return fmt.Errorf("HandleSingleBatch: error aggregating signatures: %w", err)
}
log.Debug("[batcher] AggregateSignatures took", "duration", time.Since(stageTimer))
log.Debug("AggregateSignatures took", "duration", time.Since(stageTimer))
b.Metrics.ObserveLatency("AggregateSignatures", float64(time.Since(stageTimer).Milliseconds()))
b.Metrics.UpdateAttestation(len(batch.State.IndexedOperators), len(aggSig.NonSigners), aggSig.QuorumResults)
for _, quorumResult := range aggSig.QuorumResults {
log.Info("[batcher] Aggregated quorum result", "quorumID", quorumResult.QuorumID, "percentSigned", quorumResult.PercentSigned)
log.Info("Aggregated quorum result", "quorumID", quorumResult.QuorumID, "percentSigned", quorumResult.PercentSigned)
}

numPassed := numBlobsAttested(aggSig.QuorumResults, batch.BlobHeaders)
Expand All @@ -450,7 +450,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
}

// Confirm the batch
log.Debug("[batcher] Confirming batch...")
log.Debug("Confirming batch...")

txn, err := b.Transactor.BuildConfirmBatchTxn(ctx, batch.BatchHeader, aggSig.QuorumResults, aggSig)
if err != nil {
Expand Down
26 changes: 13 additions & 13 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func NewEncodingStreamer(
assignmentCoordinator: assignmentCoordinator,
encodingCtxCancelFuncs: make([]context.CancelFunc, 0),
metrics: metrics,
logger: logger,
logger: logger.With("component", "EncodingStreamer"),
exclusiveStartKey: nil,
}, nil
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan
}
}

e.logger.Debug("[encodingstreamer] metadata in processing status", "numMetadata", len(metadatas))
e.logger.Debug("metadata in processing status", "numMetadata", len(metadatas))
metadatas = e.dedupRequests(metadatas, referenceBlockNumber)
if len(metadatas) == 0 {
e.logger.Info("no new metadatas to encode")
Expand All @@ -235,14 +235,14 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan
}
if numMetadatastoProcess <= 0 {
// encoding queue is full
e.logger.Warn("[RequestEncoding] worker pool queue is full. skipping this round of encoding requests", "waitingQueueSize", waitingQueueSize, "encodingQueueLimit", e.EncodingQueueLimit)
e.logger.Warn("worker pool queue is full. skipping this round of encoding requests", "waitingQueueSize", waitingQueueSize, "encodingQueueLimit", e.EncodingQueueLimit)
return nil
}
// only process subset of blobs so it doesn't exceed the EncodingQueueLimit
// TODO: this should be done at the request time and keep the cursor so that we don't fetch the same metadata every time
metadatas = metadatas[:numMetadatastoProcess]

e.logger.Debug("[encodingstreamer] new metadatas to encode", "numMetadata", len(metadatas), "duration", time.Since(stageTimer))
e.logger.Debug("new metadatas to encode", "numMetadata", len(metadatas), "duration", time.Since(stageTimer))

// Get the operator state
state, err := e.getOperatorState(ctx, metadatas, referenceBlockNumber)
Expand All @@ -261,9 +261,9 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan
if err != nil {
return fmt.Errorf("error getting blobs from blob store: %w", err)
}
e.logger.Debug("[RequestEncoding] retrieved blobs to encode", "numBlobs", len(blobs), "duration", time.Since(stageTimer))
e.logger.Debug("retrieved blobs to encode", "numBlobs", len(blobs), "duration", time.Since(stageTimer))

e.logger.Debug("[RequestEncoding] encoding blobs...", "numBlobs", len(blobs), "blockNumber", referenceBlockNumber)
e.logger.Debug("encoding blobs...", "numBlobs", len(blobs), "blockNumber", referenceBlockNumber)

for i := range metadatas {
metadata := metadatas[i]
Expand Down Expand Up @@ -301,7 +301,7 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata

chunkLength, err := e.assignmentCoordinator.CalculateChunkLength(state.OperatorState, blobLength, e.StreamerConfig.TargetNumChunks, quorum)
if err != nil {
e.logger.Error("[RequestEncodingForBlob] error calculating chunk length", "err", err)
e.logger.Error("error calculating chunk length", "err", err)
continue
}

Expand All @@ -316,19 +316,19 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata
}
assignments, info, err := e.assignmentCoordinator.GetAssignments(state.OperatorState, blobLength, blobQuorumInfo)
if err != nil {
e.logger.Error("[RequestEncodingForBlob] error getting assignments", "err", err)
e.logger.Error("error getting assignments", "err", err)
continue
}

params := encoding.ParamsFromMins(chunkLength, info.TotalChunks)

err = encoding.ValidateEncodingParams(params, int(blobLength), e.SRSOrder)
if err != nil {
e.logger.Error("[RequestEncodingForBlob] invalid encoding params", "err", err)
e.logger.Error("invalid encoding params", "err", err)
// Cancel the blob
err := e.blobStore.MarkBlobFailed(ctx, blobKey)
if err != nil {
e.logger.Error("[RequestEncodingForBlob] error marking blob failed", "err", err)
e.logger.Error("error marking blob failed", "err", err)
}
return
}
Expand Down Expand Up @@ -422,7 +422,7 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) {
// Cancel outstanding encoding requests
// Assumption: `CreateBatch` will be called at an interval longer than time it takes to encode a single blob
if len(e.encodingCtxCancelFuncs) > 0 {
e.logger.Info("[CreateBatch] canceling outstanding encoding requests", "count", len(e.encodingCtxCancelFuncs))
e.logger.Info("canceling outstanding encoding requests", "count", len(e.encodingCtxCancelFuncs))
for _, cancel := range e.encodingCtxCancelFuncs {
cancel()
}
Expand All @@ -433,7 +433,7 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) {
if e.ReferenceBlockNumber == 0 {
blockNumber, err := e.chainState.GetCurrentBlockNumber()
if err != nil {
e.logger.Error("[CreateBatch] failed to get current block number. will not clean up the encoded blob store.", "err", err)
e.logger.Error("failed to get current block number. will not clean up the encoded blob store.", "err", err)
} else {
_ = e.EncodedBlobstore.GetNewAndDeleteStaleEncodingResults(blockNumber)
}
Expand All @@ -449,7 +449,7 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) {
e.EncodedSizeNotifier.active = true
e.EncodedSizeNotifier.mu.Unlock()

e.logger.Info("[CreateBatch] creating a batch...", "numBlobs", len(encodedResults), "refblockNumber", e.ReferenceBlockNumber)
e.logger.Info("creating a batch...", "numBlobs", len(encodedResults), "refblockNumber", e.ReferenceBlockNumber)
if len(encodedResults) == 0 {
return nil, errNoEncodedResults
}
Expand Down
Loading

0 comments on commit 15f3ee3

Please sign in to comment.