Skip to content

Commit

Permalink
[Batcher] Add Readiness and Liveness Health Probe (#203)
Browse files Browse the repository at this point in the history
Co-authored-by: Siddharth More <Siddhi More>
  • Loading branch information
siddimore authored Feb 21, 2024
1 parent 402eafb commit 8d5eb99
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 25 deletions.
31 changes: 25 additions & 6 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ type Batcher struct {
TransactionManager TxnManager
Metrics *Metrics

ethClient common.EthClient
finalizer Finalizer
logger common.Logger
ethClient common.EthClient
finalizer Finalizer
logger common.Logger
HeartbeatChan chan time.Time
}

func NewBatcher(
Expand All @@ -96,6 +97,7 @@ func NewBatcher(
txnManager TxnManager,
logger common.Logger,
metrics *Metrics,
heartbeatChan chan time.Time,
) (*Batcher, error) {
batchTrigger := NewEncodedSizeNotifier(
make(chan struct{}, 1),
Expand Down Expand Up @@ -130,9 +132,10 @@ func NewBatcher(
TransactionManager: txnManager,
Metrics: metrics,

ethClient: ethClient,
finalizer: finalizer,
logger: logger,
ethClient: ethClient,
finalizer: finalizer,
logger: logger,
HeartbeatChan: heartbeatChan,
}, nil
}

Expand Down Expand Up @@ -187,6 +190,7 @@ func (b *Batcher) Start(ctx context.Context) error {
}
case <-batchTrigger.Notify:
ticker.Stop()

if err := b.HandleSingleBatch(ctx); err != nil {
if errors.Is(err, errNoEncodedResults) {
b.logger.Warn("no encoded results to make a batch with")
Expand Down Expand Up @@ -379,6 +383,10 @@ type confirmationMetadata struct {

func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
log := b.logger

// Signal Liveness to indicate no stall
b.signalLiveness()

// start a timer
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
b.Metrics.ObserveLatency("total", f*1000) // make milliseconds
Expand Down Expand Up @@ -576,3 +584,14 @@ func isBlobAttested(signedQuorums map[core.QuorumID]*core.QuorumResult, header *
}
return true
}

func (b *Batcher) signalLiveness() {
select {
case b.HeartbeatChan <- time.Now():
b.logger.Info("Heartbeat signal sent")
default:
// This case happens if there's no receiver ready to consume the heartbeat signal.
// It prevents the goroutine from blocking if the channel is full or not being listened to.
b.logger.Warn("Heartbeat signal skipped, no receiver on the channel")
}
}
75 changes: 60 additions & 15 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import (
)

var (
gettysburgAddressBytes = []byte("Fourscore and seven years ago our fathers brought forth, on this continent, a new nation, conceived in liberty, and dedicated to the proposition that all men are created equal. Now we are engaged in a great civil war, testing whether that nation, or any nation so conceived, and so dedicated, can long endure. We are met on a great battle-field of that war. We have come to dedicate a portion of that field, as a final resting-place for those who here gave their lives, that that nation might live. It is altogether fitting and proper that we should do this. But, in a larger sense, we cannot dedicate, we cannot consecrate—we cannot hallow—this ground. The brave men, living and dead, who struggled here, have consecrated it far above our poor power to add or detract. The world will little note, nor long remember what we say here, but it can never forget what they did here. It is for us the living, rather, to be dedicated here to the unfinished work which they who fought here have thus far so nobly advanced. It is rather for us to be here dedicated to the great task remaining before us—that from these honored dead we take increased devotion to that cause for which they here gave the last full measure of devotion—that we here highly resolve that these dead shall not have died in vain—that this nation, under God, shall have a new birth of freedom, and that government of the people, by the people, for the people, shall not perish from the earth.")
gettysburgAddressBytes = []byte("Fourscore and seven years ago our fathers brought forth, on this continent, a new nation, conceived in liberty, and dedicated to the proposition that all men are created equal. Now we are engaged in a great civil war, testing whether that nation, or any nation so conceived, and so dedicated, can long endure. We are met on a great battle-field of that war. We have come to dedicate a portion of that field, as a final resting-place for those who here gave their lives, that that nation might live. It is altogether fitting and proper that we should do this. But, in a larger sense, we cannot dedicate, we cannot consecrate—we cannot hallow—this ground. The brave men, living and dead, who struggled here, have consecrated it far above our poor power to add or detract. The world will little note, nor long remember what we say here, but it can never forget what they did here. It is for us the living, rather, to be dedicated here to the unfinished work which they who fought here have thus far so nobly advanced. It is rather for us to be here dedicated to the great task remaining before us—that from these honored dead we take increased devotion to that cause for which they here gave the last full measure of devotion—that we here highly resolve that these dead shall not have died in vain—that this nation, under God, shall have a new birth of freedom, and that government of the people, by the people, for the people, shall not perish from the earth.")
handleBatchLivenessChan = make(chan time.Time, 1)
)

type batcherComponents struct {
transactor *coremock.MockTransactor
txnManager *mock.MockTxnManager
txnManager *batchermock.MockTxnManager
blobStore disperser.BlobStore
encoderClient *disperser.LocalEncoderClient
encodingStreamer *bat.EncodingStreamer
Expand Down Expand Up @@ -64,7 +65,7 @@ func makeTestBlob(securityParams []*core.SecurityParam) core.Blob {
return blob
}

func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher) {
func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.Time) {
// Common Components
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
Expand Down Expand Up @@ -110,18 +111,35 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher) {
ethClient := &cmock.MockEthClient{}
txnManager := mock.NewTxnManager()

b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encoderClient, agg, ethClient, finalizer, transactor, txnManager, logger, metrics)
b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encoderClient, agg, ethClient, finalizer, transactor, txnManager, logger, metrics, handleBatchLivenessChan)
assert.NoError(t, err)

var heartbeatsReceived []time.Time
doneListening := make(chan bool)

go func() {
for {
select {
case hb := <-b.HeartbeatChan:
heartbeatsReceived = append(heartbeatsReceived, hb)
case <-doneListening:
return
}
}
}()

// Make the batcher
return &batcherComponents{
transactor: transactor,
txnManager: txnManager,
blobStore: blobStore,
encoderClient: encoderClient,
encodingStreamer: b.EncodingStreamer,
ethClient: ethClient,
}, b
transactor: transactor,
txnManager: txnManager,
blobStore: blobStore,
encoderClient: encoderClient,
encodingStreamer: b.EncodingStreamer,
ethClient: ethClient,
}, b, func() []time.Time {
close(doneListening) // Stop the goroutine listening to heartbeats
return heartbeatsReceived
}
}

func queueBlob(t *testing.T, ctx context.Context, blob *core.Blob, blobStore disperser.BlobStore) (uint64, disperser.BlobKey) {
Expand All @@ -143,7 +161,15 @@ func TestBatcherIterations(t *testing.T) {
AdversaryThreshold: 70,
QuorumThreshold: 100,
}})
components, batcher := makeBatcher(t)
components, batcher, getHeartbeats := makeBatcher(t)

defer func() {
heartbeats := getHeartbeats()
assert.NotEmpty(t, heartbeats, "Expected heartbeats, but none were received")

// Further assertions can be made here, such as checking the number of heartbeats
// or validating the time intervals between them if needed.
}()
// should be encoding 3 and 0
logData, err := hex.DecodeString("00000000000000000000000000000000000000000000000000000000000000030000000000000000000000000000000000000000000000000000000000000000")
assert.NoError(t, err)
Expand Down Expand Up @@ -233,7 +259,13 @@ func TestBlobFailures(t *testing.T) {
QuorumThreshold: 100,
}})

components, batcher := makeBatcher(t)
components, batcher, getHeartbeats := makeBatcher(t)

defer func() {
heartbeats := getHeartbeats()
assert.Equal(t, 3, len(heartbeats), "Expected heartbeats, but none were received")
}()

confirmationErr := fmt.Errorf("error")
blobStore := components.blobStore
ctx := context.Background()
Expand Down Expand Up @@ -333,7 +365,13 @@ func TestBlobRetry(t *testing.T) {
QuorumThreshold: 100,
}})

components, batcher := makeBatcher(t)
components, batcher, getHeartbeats := makeBatcher(t)

defer func() {
heartbeats := getHeartbeats()
assert.Equal(t, 1, len(heartbeats), "Expected heartbeats, but none were received")
}()

blobStore := components.blobStore
ctx := context.Background()
_, blobKey := queueBlob(t, ctx, &blob, blobStore)
Expand Down Expand Up @@ -430,8 +468,15 @@ func TestRetryTxnReceipt(t *testing.T) {
AdversaryThreshold: 80,
QuorumThreshold: 100,
}})
components, batcher := makeBatcher(t)
components, batcher, getHeartbeats := makeBatcher(t)

defer func() {
heartbeats := getHeartbeats()
assert.NotEmpty(t, heartbeats, "Expected heartbeats, but none were received")

// Further assertions can be made here, such as checking the number of heartbeats
// or validating the time intervals between them if needed.
}()
invalidReceipt := &types.Receipt{
Logs: []*types.Log{
{
Expand Down
54 changes: 53 additions & 1 deletion disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ var (
version string
gitCommit string
gitDate string
// Note: Changing these paths will require updating the k8s deployment
readinessProbePath string = "/tmp/ready"
healthProbePath string = "/tmp/health"
maxStallDuration time.Duration = 240 * time.Second
handleBatchLivenessChan = make(chan time.Time, 1)
)

func main() {
Expand All @@ -48,10 +53,23 @@ func main() {
log.Fatalf("application failed: %v", err)
}

if _, err := os.Create(healthProbePath); err != nil {
log.Printf("Failed to create healthProbe file: %v", err)
}

// Start HeartBeat Monitor
go heartbeatMonitor(healthProbePath, maxStallDuration)

select {}
}

func RunBatcher(ctx *cli.Context) error {

// Clean up readiness file
if err := os.Remove(readinessProbePath); err != nil {
log.Printf("Failed to clean up readiness file: %v at path %v \n", err, readinessProbePath)
}

config := NewConfig(ctx)

logger, err := logging.GetLogger(config.LoggerConfig)
Expand Down Expand Up @@ -142,7 +160,7 @@ func RunBatcher(ctx *cli.Context) error {
}
finalizer := batcher.NewFinalizer(config.TimeoutConfig.ChainReadTimeout, config.BatcherConfig.FinalizerInterval, queue, client, rpcClient, config.BatcherConfig.MaxNumRetriesPerBlob, 1000, config.BatcherConfig.FinalizerPoolSize, logger, metrics.FinalizerMetrics)
txnManager := batcher.NewTxnManager(client, 20, config.TimeoutConfig.ChainWriteTimeout, logger, metrics.TxnManagerMetrics)
batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics)
batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan)
if err != nil {
return err
}
Expand All @@ -159,6 +177,40 @@ func RunBatcher(ctx *cli.Context) error {
return err
}

// Signal readiness
if _, err := os.Create(readinessProbePath); err != nil {
log.Printf("Failed to create readiness file: %v at path %v \n", err, readinessProbePath)
}

return nil

}

// process liveness signal from handleBatch Go Routine
func heartbeatMonitor(filePath string, maxStallDuration time.Duration) {
var lastHeartbeat time.Time
stallTimer := time.NewTimer(maxStallDuration)

for {
select {
// HeartBeat from Goroutine on Batcher Pull Interval
case heartbeat, ok := <-handleBatchLivenessChan:
if !ok {
log.Println("handleBatchLivenessChan closed, stopping health probe")
return
}
log.Printf("Received heartbeat from HandleBatch GoRoutine: %v\n", heartbeat)
lastHeartbeat = heartbeat
if err := os.WriteFile(filePath, []byte(lastHeartbeat.String()), 0666); err != nil {
log.Printf("Failed to update heartbeat file: %v", err)
} else {
log.Printf("Updated heartbeat file: %v with time %v\n", filePath, lastHeartbeat)
}
stallTimer.Reset(maxStallDuration) // Reset timer on new heartbeat

case <-stallTimer.C:
log.Println("No heartbeat received within max stall duration, stopping health probe")
return
}
}
}
7 changes: 4 additions & 3 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ var (
enc core.Encoder
asn core.AssignmentCoordinator

gettysburgAddressBytes = []byte("Fourscore and seven years ago our fathers brought forth, on this continent, a new nation, conceived in liberty, and dedicated to the proposition that all men are created equal. Now we are engaged in a great civil war, testing whether that nation, or any nation so conceived, and so dedicated, can long endure. We are met on a great battle-field of that war. We have come to dedicate a portion of that field, as a final resting-place for those who here gave their lives, that that nation might live. It is altogether fitting and proper that we should do this. But, in a larger sense, we cannot dedicate, we cannot consecrate—we cannot hallow—this ground. The brave men, living and dead, who struggled here, have consecrated it far above our poor power to add or detract. The world will little note, nor long remember what we say here, but it can never forget what they did here. It is for us the living, rather, to be dedicated here to the unfinished work which they who fought here have thus far so nobly advanced. It is rather for us to be here dedicated to the great task remaining before us—that from these honored dead we take increased devotion to that cause for which they here gave the last full measure of devotion—that we here highly resolve that these dead shall not have died in vain—that this nation, under God, shall have a new birth of freedom, and that government of the people, by the people, for the people, shall not perish from the earth.")
serviceManagerAddress = gethcommon.HexToAddress("0x0000000000000000000000000000000000000000")
gettysburgAddressBytes = []byte("Fourscore and seven years ago our fathers brought forth, on this continent, a new nation, conceived in liberty, and dedicated to the proposition that all men are created equal. Now we are engaged in a great civil war, testing whether that nation, or any nation so conceived, and so dedicated, can long endure. We are met on a great battle-field of that war. We have come to dedicate a portion of that field, as a final resting-place for those who here gave their lives, that that nation might live. It is altogether fitting and proper that we should do this. But, in a larger sense, we cannot dedicate, we cannot consecrate—we cannot hallow—this ground. The brave men, living and dead, who struggled here, have consecrated it far above our poor power to add or detract. The world will little note, nor long remember what we say here, but it can never forget what they did here. It is for us the living, rather, to be dedicated here to the unfinished work which they who fought here have thus far so nobly advanced. It is rather for us to be here dedicated to the great task remaining before us—that from these honored dead we take increased devotion to that cause for which they here gave the last full measure of devotion—that we here highly resolve that these dead shall not have died in vain—that this nation, under God, shall have a new birth of freedom, and that government of the people, by the people, for the people, shall not perish from the earth.")
serviceManagerAddress = gethcommon.HexToAddress("0x0000000000000000000000000000000000000000")
handleBatchLivenessChan = make(chan time.Time, 1)
)

const (
Expand Down Expand Up @@ -163,7 +164,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser
batcherMetrics := batcher.NewMetrics("9100", logger)
txnManager := batchermock.NewTxnManager()

batcher, err := batcher.NewBatcher(batcherConfig, timeoutConfig, store, dispatcher, cst, asn, encoderClient, agg, &commonmock.MockEthClient{}, finalizer, transactor, txnManager, logger, batcherMetrics)
batcher, err := batcher.NewBatcher(batcherConfig, timeoutConfig, store, dispatcher, cst, asn, encoderClient, agg, &commonmock.MockEthClient{}, finalizer, transactor, txnManager, logger, batcherMetrics, handleBatchLivenessChan)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 8d5eb99

Please sign in to comment.