From 2268e52e4943ca0b001fae28005fb3fd5d3b33a5 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Tue, 6 Aug 2024 10:04:56 -0500 Subject: [PATCH 01/10] Split blob writer code out of larger PR. Signed-off-by: Cody Littley --- tools/traffic/test/blob_writer_test.go | 143 +++++++++++++++ tools/traffic/test/mock_disperser.go | 96 ++++++++++ .../traffic/test/mock_interceptable_ticker.go | 27 +++ tools/traffic/test/mock_key_handler.go | 38 ++++ tools/traffic/workers/blob_writer.go | 166 ++++++++++++++++++ tools/traffic/workers/interceptable_ticker.go | 26 +++ tools/traffic/workers/key_handler.go | 7 + 7 files changed, 503 insertions(+) create mode 100644 tools/traffic/test/blob_writer_test.go create mode 100644 tools/traffic/test/mock_disperser.go create mode 100644 tools/traffic/test/mock_interceptable_ticker.go create mode 100644 tools/traffic/test/mock_key_handler.go create mode 100644 tools/traffic/workers/blob_writer.go create mode 100644 tools/traffic/workers/interceptable_ticker.go create mode 100644 tools/traffic/workers/key_handler.go diff --git a/tools/traffic/test/blob_writer_test.go b/tools/traffic/test/blob_writer_test.go new file mode 100644 index 0000000000..191aeb03d7 --- /dev/null +++ b/tools/traffic/test/blob_writer_test.go @@ -0,0 +1,143 @@ +package test + +import ( + "context" + "crypto/md5" + "fmt" + "github.com/Layr-Labs/eigenda/common" + tu "github.com/Layr-Labs/eigenda/common/testutils" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/Layr-Labs/eigenda/tools/traffic/config" + "github.com/Layr-Labs/eigenda/tools/traffic/metrics" + "github.com/Layr-Labs/eigenda/tools/traffic/workers" + "github.com/stretchr/testify/assert" + "golang.org/x/exp/rand" + "sync" + "testing" + "time" +) + +func TestBlobWriter(t *testing.T) { + tu.InitializeRandom() + + ctx, cancel := context.WithCancel(context.Background()) + waitGroup := sync.WaitGroup{} + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + assert.Nil(t, err) + startTime := time.Unix(rand.Int63()%2_000_000_000, 0) + ticker := newMockTicker(startTime) + + dataSize := rand.Uint64()%1024 + 64 + + authenticated := rand.Intn(2) == 0 + var signerPrivateKey string + if authenticated { + signerPrivateKey = "asdf" + } + + randomizeBlobs := rand.Intn(2) == 0 + + useCustomQuorum := rand.Intn(2) == 0 + var customQuorum []uint8 + if useCustomQuorum { + customQuorum = []uint8{1, 2, 3} + } + + config := &config.WorkerConfig{ + DataSize: dataSize, + SignerPrivateKey: signerPrivateKey, + RandomizeBlobs: randomizeBlobs, + CustomQuorums: customQuorum, + } + + lock := sync.Mutex{} + + disperserClient := newMockDisperserClient(t, &lock, authenticated) + unconfirmedKeyHandler := newMockKeyHandler(t, &lock) + + generatorMetrics := metrics.NewMockMetrics() + + writer := workers.NewBlobWriter( + &ctx, + &waitGroup, + logger, + ticker, + config, + disperserClient, + unconfirmedKeyHandler, + generatorMetrics) + writer.Start() + + errorProbability := 0.1 + errorCount := 0 + + var previousData []byte + + for i := 0; i < 100; i++ { + if rand.Float64() < errorProbability { + disperserClient.DispenseErrorToReturn = fmt.Errorf("intentional error for testing purposes") + errorCount++ + } else { + disperserClient.DispenseErrorToReturn = nil + } + + // This is the key that will be assigned to the next blob. + disperserClient.KeyToReturn = make([]byte, 32) + _, err = rand.Read(disperserClient.KeyToReturn) + assert.Nil(t, err) + + // Move time forward, allowing the writer to attempt to send a blob. + ticker.Tick(1 * time.Second) + + // Wait until the writer finishes its work. + tu.AssertEventuallyTrue(t, func() bool { + lock.Lock() + defer lock.Unlock() + return int(disperserClient.DisperseCount) > i && int(unconfirmedKeyHandler.Count)+errorCount > i + }, time.Second) + + // These methods should be called exactly once per tick if there are no errors. + // In the presence of errors, nothing should be passed to the unconfirmed key handler. + assert.Equal(t, uint(i+1), disperserClient.DisperseCount) + assert.Equal(t, uint(i+1-errorCount), unconfirmedKeyHandler.Count) + + // This method should not be called in this test. + assert.Equal(t, uint(0), disperserClient.GetStatusCount) + + if disperserClient.DispenseErrorToReturn == nil { + assert.NotNil(t, disperserClient.ProvidedData) + assert.Equal(t, customQuorum, disperserClient.ProvidedQuorum) + + // Strip away the extra encoding bytes. We should have data of the expected size. + decodedData := codec.RemoveEmptyByteFromPaddedBytes(disperserClient.ProvidedData) + assert.Equal(t, dataSize, uint64(len(decodedData))) + + // Verify that the proper data was sent to the unconfirmed key handler. + assert.Equal(t, uint(len(disperserClient.ProvidedData)), unconfirmedKeyHandler.ProvidedSize) + checksum := md5.Sum(disperserClient.ProvidedData) + assert.Equal(t, checksum, unconfirmedKeyHandler.ProvidedChecksum) + assert.Equal(t, disperserClient.KeyToReturn, unconfirmedKeyHandler.ProvidedKey) + + // Verify that data has the proper amount of randomness. + if previousData != nil { + if randomizeBlobs { + // We expect each blob to be different. + assert.NotEqual(t, previousData, disperserClient.ProvidedData) + } else { + // We expect each blob to be the same. + assert.Equal(t, previousData, disperserClient.ProvidedData) + } + } + previousData = disperserClient.ProvidedData + } + + // Verify metrics. + assert.Equal(t, float64(i+1-errorCount), generatorMetrics.GetCount("write_success")) + assert.Equal(t, float64(errorCount), generatorMetrics.GetCount("write_failure")) + } + + cancel() + tu.ExecuteWithTimeout(func() { + waitGroup.Wait() + }, time.Second) +} diff --git a/tools/traffic/test/mock_disperser.go b/tools/traffic/test/mock_disperser.go new file mode 100644 index 0000000000..a03b3bb2ab --- /dev/null +++ b/tools/traffic/test/mock_disperser.go @@ -0,0 +1,96 @@ +package test + +import ( + "context" + disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/stretchr/testify/assert" + "sync" + "testing" +) + +type mockDisperserClient struct { + t *testing.T + // if true, DisperseBlobAuthenticated is expected to be used, otherwise DisperseBlob is expected to be used + authenticated bool + + // The next status, key, and error to return from DisperseBlob or DisperseBlobAuthenticated + StatusToReturn disperser.BlobStatus + KeyToReturn []byte + DispenseErrorToReturn error + + // The previous values passed to DisperseBlob or DisperseBlobAuthenticated + ProvidedData []byte + ProvidedQuorum []uint8 + + // Incremented each time DisperseBlob or DisperseBlobAuthenticated is called. + DisperseCount uint + + // A map from key (in string form) to the status to return from GetBlobStatus. If nil, then an error is returned. + StatusMap map[string]disperser_rpc.BlobStatus + + // Incremented each time GetBlobStatus is called. + GetStatusCount uint + + lock *sync.Mutex +} + +func newMockDisperserClient(t *testing.T, lock *sync.Mutex, authenticated bool) *mockDisperserClient { + return &mockDisperserClient{ + t: t, + lock: lock, + authenticated: authenticated, + StatusMap: make(map[string]disperser_rpc.BlobStatus), + } +} + +func (m *mockDisperserClient) DisperseBlob( + ctx context.Context, + data []byte, + customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { + + m.lock.Lock() + defer m.lock.Unlock() + + assert.False(m.t, m.authenticated, "writer configured to use non-authenticated disperser method") + m.ProvidedData = data + m.ProvidedQuorum = customQuorums + m.DisperseCount++ + return &m.StatusToReturn, m.KeyToReturn, m.DispenseErrorToReturn +} + +func (m *mockDisperserClient) DisperseBlobAuthenticated( + ctx context.Context, + data []byte, + customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { + + m.lock.Lock() + defer m.lock.Unlock() + + assert.True(m.t, m.authenticated, "writer configured to use authenticated disperser method") + m.ProvidedData = data + m.ProvidedQuorum = customQuorums + m.DisperseCount++ + return &m.StatusToReturn, m.KeyToReturn, m.DispenseErrorToReturn +} + +func (m *mockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { + m.lock.Lock() + defer m.lock.Unlock() + + status := m.StatusMap[string(key)] + m.GetStatusCount++ + + return &disperser_rpc.BlobStatusReply{ + Status: status, + Info: &disperser_rpc.BlobInfo{ + BlobVerificationProof: &disperser_rpc.BlobVerificationProof{ + BatchMetadata: &disperser_rpc.BatchMetadata{}, + }, + }, + }, nil +} + +func (m *mockDisperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) { + panic("this method should not be called by the generator utility") +} diff --git a/tools/traffic/test/mock_interceptable_ticker.go b/tools/traffic/test/mock_interceptable_ticker.go new file mode 100644 index 0000000000..7179c37052 --- /dev/null +++ b/tools/traffic/test/mock_interceptable_ticker.go @@ -0,0 +1,27 @@ +package test + +import "time" + +// mockTicker is a deterministic implementation of the InterceptableTicker interface. +type mockTicker struct { + channel chan time.Time + now time.Time +} + +// newMockTicker creates a new InterceptableTicker that can be deterministically controlled in tests. +func newMockTicker(now time.Time) *mockTicker { + return &mockTicker{ + channel: make(chan time.Time), + now: now, + } +} + +func (m *mockTicker) GetTimeChannel() <-chan time.Time { + return m.channel +} + +// Tick advances the ticker by the specified duration. +func (m *mockTicker) Tick(elapsedTime time.Duration) { + m.now = m.now.Add(elapsedTime) + m.channel <- m.now +} diff --git a/tools/traffic/test/mock_key_handler.go b/tools/traffic/test/mock_key_handler.go new file mode 100644 index 0000000000..b75a5015d7 --- /dev/null +++ b/tools/traffic/test/mock_key_handler.go @@ -0,0 +1,38 @@ +package test + +import ( + "sync" + "testing" +) + +// mockKeyHandler is a stand-in for the blob verifier's UnconfirmedKeyHandler. +type mockKeyHandler struct { + t *testing.T + + ProvidedKey []byte + ProvidedChecksum [16]byte + ProvidedSize uint + + // Incremented each time AddUnconfirmedKey is called. + Count uint + + lock *sync.Mutex +} + +func newMockKeyHandler(t *testing.T, lock *sync.Mutex) *mockKeyHandler { + return &mockKeyHandler{ + t: t, + lock: lock, + } +} + +func (m *mockKeyHandler) AddUnconfirmedKey(key []byte, checksum [16]byte, size uint) { + m.lock.Lock() + defer m.lock.Unlock() + + m.ProvidedKey = key + m.ProvidedChecksum = checksum + m.ProvidedSize = size + + m.Count++ +} diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go new file mode 100644 index 0000000000..3d59f71570 --- /dev/null +++ b/tools/traffic/workers/blob_writer.go @@ -0,0 +1,166 @@ +package workers + +import ( + "context" + "crypto/md5" + "crypto/rand" + "fmt" + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + config2 "github.com/Layr-Labs/eigenda/tools/traffic/config" + "github.com/Layr-Labs/eigenda/tools/traffic/metrics" + "github.com/Layr-Labs/eigensdk-go/logging" + "sync" +) + +// BlobWriter sends blobs to a disperser at a configured rate. +type BlobWriter struct { + // The context for the generator. All work should cease when this context is cancelled. + ctx *context.Context + + // Tracks the number of active goroutines within the generator. + waitGroup *sync.WaitGroup + + // All logs should be written using this logger. + logger logging.Logger + + // ticker is used to control the rate at which blobs are sent to the disperser. + ticker InterceptableTicker + + // Config contains the configuration for the generator. + config *config2.WorkerConfig + + // disperser is the client used to send blobs to the disperser. + disperser clients.DisperserClient + + // Unconfirmed keys are sent here. + unconfirmedKeyHandler KeyHandler + + // fixedRandomData contains random data for blobs if RandomizeBlobs is false, and nil otherwise. + fixedRandomData *[]byte + + // writeLatencyMetric is used to record latency for write requests. + writeLatencyMetric metrics.LatencyMetric + + // writeSuccessMetric is used to record the number of successful write requests. + writeSuccessMetric metrics.CountMetric + + // writeFailureMetric is used to record the number of failed write requests. + writeFailureMetric metrics.CountMetric +} + +// NewBlobWriter creates a new BlobWriter instance. +func NewBlobWriter( + ctx *context.Context, + waitGroup *sync.WaitGroup, + logger logging.Logger, + ticker InterceptableTicker, + config *config2.WorkerConfig, + disperser clients.DisperserClient, + unconfirmedKeyHandler KeyHandler, + generatorMetrics metrics.Metrics) BlobWriter { + + var fixedRandomData []byte + if config.RandomizeBlobs { + // New random data will be generated for each blob. + fixedRandomData = nil + } else { + // Use this random data for each blob. + fixedRandomData = make([]byte, config.DataSize) + _, err := rand.Read(fixedRandomData) + if err != nil { + panic(fmt.Sprintf("unable to read random data: %s", err)) + } + fixedRandomData = codec.ConvertByPaddingEmptyByte(fixedRandomData) + } + + return BlobWriter{ + ctx: ctx, + waitGroup: waitGroup, + logger: logger, + ticker: ticker, + config: config, + disperser: disperser, + unconfirmedKeyHandler: unconfirmedKeyHandler, + fixedRandomData: &fixedRandomData, + writeLatencyMetric: generatorMetrics.NewLatencyMetric("write"), + writeSuccessMetric: generatorMetrics.NewCountMetric("write_success"), + writeFailureMetric: generatorMetrics.NewCountMetric("write_failure"), + } +} + +// Start begins the blob writer goroutine. +func (writer *BlobWriter) Start() { + writer.waitGroup.Add(1) + go func() { + writer.run() + writer.waitGroup.Done() + }() +} + +// run sends blobs to a disperser at a configured rate. +// Continues and dues not return until the context is cancelled. +func (writer *BlobWriter) run() { + for { + select { + case <-(*writer.ctx).Done(): + return + case <-writer.ticker.GetTimeChannel(): + data := writer.getRandomData() + key, err := metrics.InvokeAndReportLatency(writer.writeLatencyMetric, func() ([]byte, error) { + return writer.sendRequest(*data) + }) + if err != nil { + writer.writeFailureMetric.Increment() + writer.logger.Error("failed to send blob request", "err:", err) + continue + } + + writer.writeSuccessMetric.Increment() + + checksum := md5.Sum(*data) + writer.unconfirmedKeyHandler.AddUnconfirmedKey(key, checksum, uint(len(*data))) + } + } +} + +// getRandomData returns a slice of random data to be used for a blob. +func (writer *BlobWriter) getRandomData() *[]byte { + if *writer.fixedRandomData != nil { + return writer.fixedRandomData + } + + data := make([]byte, writer.config.DataSize) + _, err := rand.Read(data) + if err != nil { + panic(fmt.Sprintf("unable to read random data: %s", err)) + } + data = codec.ConvertByPaddingEmptyByte(data) + + return &data +} + +// sendRequest sends a blob to a disperser. +func (writer *BlobWriter) sendRequest(data []byte) ([]byte /* key */, error) { + + ctxTimeout, cancel := context.WithTimeout(*writer.ctx, writer.config.WriteTimeout) + defer cancel() + + var key []byte + var err error + if writer.config.SignerPrivateKey != "" { + _, key, err = writer.disperser.DisperseBlobAuthenticated( + ctxTimeout, + data, + writer.config.CustomQuorums) + } else { + _, key, err = writer.disperser.DisperseBlob( + ctxTimeout, + data, + writer.config.CustomQuorums) + } + if err != nil { + return nil, err + } + return key, nil +} diff --git a/tools/traffic/workers/interceptable_ticker.go b/tools/traffic/workers/interceptable_ticker.go new file mode 100644 index 0000000000..847f6acb25 --- /dev/null +++ b/tools/traffic/workers/interceptable_ticker.go @@ -0,0 +1,26 @@ +package workers + +import "time" + +// InterceptableTicker is a wrapper around the time.Ticker struct. +// It allows for deterministic time passage to be simulated in tests. +type InterceptableTicker interface { + // getTimeChannel returns the channel that the ticker sends ticks on. Equivalent to time.Ticker.C. + GetTimeChannel() <-chan time.Time +} + +// standardTicker behaves exactly like a time.Ticker, for use in production code. +type standardTicker struct { + ticker *time.Ticker +} + +// NewTicker creates a new InterceptableTicker that behaves like a time.Ticker. +func NewTicker(d time.Duration) InterceptableTicker { + return &standardTicker{ + ticker: time.NewTicker(d), + } +} + +func (s *standardTicker) GetTimeChannel() <-chan time.Time { + return s.ticker.C +} diff --git a/tools/traffic/workers/key_handler.go b/tools/traffic/workers/key_handler.go new file mode 100644 index 0000000000..30c8b5ed9c --- /dev/null +++ b/tools/traffic/workers/key_handler.go @@ -0,0 +1,7 @@ +package workers + +// KeyHandler is an interface describing an object that can accept unconfirmed keys. +type KeyHandler interface { + // AddUnconfirmedKey accepts an unconfirmed blob key, the checksum of the blob, and the size of the blob in bytes. + AddUnconfirmedKey(key []byte, checksum [16]byte, size uint) +} From 72b57ba1ae471514129afd79a7a849fda2326801 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Wed, 7 Aug 2024 07:19:28 -0500 Subject: [PATCH 02/10] Fix test flake. Signed-off-by: Cody Littley --- tools/traffic/test/blob_writer_test.go | 27 ++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/tools/traffic/test/blob_writer_test.go b/tools/traffic/test/blob_writer_test.go index 191aeb03d7..fb1b834a06 100644 --- a/tools/traffic/test/blob_writer_test.go +++ b/tools/traffic/test/blob_writer_test.go @@ -1,6 +1,7 @@ package test import ( + "bytes" "context" "crypto/md5" "fmt" @@ -93,13 +94,9 @@ func TestBlobWriter(t *testing.T) { tu.AssertEventuallyTrue(t, func() bool { lock.Lock() defer lock.Unlock() - return int(disperserClient.DisperseCount) > i && int(unconfirmedKeyHandler.Count)+errorCount > i - }, time.Second) - // These methods should be called exactly once per tick if there are no errors. - // In the presence of errors, nothing should be passed to the unconfirmed key handler. - assert.Equal(t, uint(i+1), disperserClient.DisperseCount) - assert.Equal(t, uint(i+1-errorCount), unconfirmedKeyHandler.Count) + return uint(i+1) == disperserClient.DisperseCount && uint(i+1-errorCount) == unconfirmedKeyHandler.Count + }, time.Second) // This method should not be called in this test. assert.Equal(t, uint(0), disperserClient.GetStatusCount) @@ -115,8 +112,16 @@ func TestBlobWriter(t *testing.T) { // Verify that the proper data was sent to the unconfirmed key handler. assert.Equal(t, uint(len(disperserClient.ProvidedData)), unconfirmedKeyHandler.ProvidedSize) checksum := md5.Sum(disperserClient.ProvidedData) - assert.Equal(t, checksum, unconfirmedKeyHandler.ProvidedChecksum) - assert.Equal(t, disperserClient.KeyToReturn, unconfirmedKeyHandler.ProvidedKey) + + tu.AssertEventuallyTrue(t, func() bool { + lock.Lock() + defer lock.Unlock() + return bytes.Equal(checksum[:], unconfirmedKeyHandler.ProvidedChecksum[:]) && + bytes.Equal(disperserClient.KeyToReturn, unconfirmedKeyHandler.ProvidedKey) + }, time.Second) + + //assert.Equal(t, checksum, unconfirmedKeyHandler.ProvidedChecksum) + //assert.Equal(t, disperserClient.KeyToReturn, unconfirmedKeyHandler.ProvidedKey) // Verify that data has the proper amount of randomness. if previousData != nil { @@ -132,8 +137,10 @@ func TestBlobWriter(t *testing.T) { } // Verify metrics. - assert.Equal(t, float64(i+1-errorCount), generatorMetrics.GetCount("write_success")) - assert.Equal(t, float64(errorCount), generatorMetrics.GetCount("write_failure")) + tu.AssertEventuallyTrue(t, func() bool { + return float64(i+1) == generatorMetrics.GetCount("write_success") && + float64(errorCount) == generatorMetrics.GetCount("write_failure") + }, time.Second) } cancel() From 6c862c24e69fc91f11302aed4a99bb62d67a475d Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 8 Aug 2024 08:27:47 -0500 Subject: [PATCH 03/10] Made requested changes. Signed-off-by: Cody Littley --- tools/traffic/test/mock_disperser.go | 3 ++ .../traffic/test/mock_interceptable_ticker.go | 7 +++- tools/traffic/test/mock_key_handler.go | 3 ++ tools/traffic/workers/blob_writer.go | 33 +++++++++++-------- 4 files changed, 31 insertions(+), 15 deletions(-) diff --git a/tools/traffic/test/mock_disperser.go b/tools/traffic/test/mock_disperser.go index a03b3bb2ab..44900e71cb 100644 --- a/tools/traffic/test/mock_disperser.go +++ b/tools/traffic/test/mock_disperser.go @@ -2,6 +2,7 @@ package test import ( "context" + "github.com/Layr-Labs/eigenda/api/clients" disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" "github.com/Layr-Labs/eigenda/disperser" "github.com/stretchr/testify/assert" @@ -9,6 +10,8 @@ import ( "testing" ) +var _ clients.DisperserClient = (*mockDisperserClient)(nil) + type mockDisperserClient struct { t *testing.T // if true, DisperseBlobAuthenticated is expected to be used, otherwise DisperseBlob is expected to be used diff --git a/tools/traffic/test/mock_interceptable_ticker.go b/tools/traffic/test/mock_interceptable_ticker.go index 7179c37052..326847f041 100644 --- a/tools/traffic/test/mock_interceptable_ticker.go +++ b/tools/traffic/test/mock_interceptable_ticker.go @@ -1,6 +1,11 @@ package test -import "time" +import ( + "github.com/Layr-Labs/eigenda/tools/traffic/workers" + "time" +) + +var _ workers.InterceptableTicker = (*mockTicker)(nil) // mockTicker is a deterministic implementation of the InterceptableTicker interface. type mockTicker struct { diff --git a/tools/traffic/test/mock_key_handler.go b/tools/traffic/test/mock_key_handler.go index b75a5015d7..6cc15f301f 100644 --- a/tools/traffic/test/mock_key_handler.go +++ b/tools/traffic/test/mock_key_handler.go @@ -1,10 +1,13 @@ package test import ( + "github.com/Layr-Labs/eigenda/tools/traffic/workers" "sync" "testing" ) +var _ workers.KeyHandler = (*mockKeyHandler)(nil) + // mockKeyHandler is a stand-in for the blob verifier's UnconfirmedKeyHandler. type mockKeyHandler struct { t *testing.T diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index 3d59f71570..815ebe2781 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -106,24 +106,29 @@ func (writer *BlobWriter) run() { case <-(*writer.ctx).Done(): return case <-writer.ticker.GetTimeChannel(): - data := writer.getRandomData() - key, err := metrics.InvokeAndReportLatency(writer.writeLatencyMetric, func() ([]byte, error) { - return writer.sendRequest(*data) - }) - if err != nil { - writer.writeFailureMetric.Increment() - writer.logger.Error("failed to send blob request", "err:", err) - continue - } - - writer.writeSuccessMetric.Increment() - - checksum := md5.Sum(*data) - writer.unconfirmedKeyHandler.AddUnconfirmedKey(key, checksum, uint(len(*data))) + writer.writeNextBlob() } } } +// writeNextBlob attempts to send a random blob to the disperser. +func (writer *BlobWriter) writeNextBlob() { + data := writer.getRandomData() + key, err := metrics.InvokeAndReportLatency(writer.writeLatencyMetric, func() ([]byte, error) { + return writer.sendRequest(*data) + }) + if err != nil { + writer.writeFailureMetric.Increment() + writer.logger.Error("failed to send blob request", "err:", err) + return + } + + writer.writeSuccessMetric.Increment() + + checksum := md5.Sum(*data) + writer.unconfirmedKeyHandler.AddUnconfirmedKey(key, checksum, uint(len(*data))) +} + // getRandomData returns a slice of random data to be used for a blob. func (writer *BlobWriter) getRandomData() *[]byte { if *writer.fixedRandomData != nil { From 1508a261920b40d20a274479fb4a535a8cb1e0ed Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 8 Aug 2024 08:49:55 -0500 Subject: [PATCH 04/10] More suggested changes. Signed-off-by: Cody Littley --- tools/traffic/test/mock_key_handler.go | 41 ---------------- tools/traffic/workers/blob_writer.go | 30 +++++------- .../{test => workers}/blob_writer_test.go | 47 +++++-------------- .../{test => workers}/mock_disperser.go | 31 ++++-------- .../mock_interceptable_ticker.go | 5 +- tools/traffic/workers/mock_key_handler.go | 33 +++++++++++++ 6 files changed, 67 insertions(+), 120 deletions(-) delete mode 100644 tools/traffic/test/mock_key_handler.go rename tools/traffic/{test => workers}/blob_writer_test.go (70%) rename tools/traffic/{test => workers}/mock_disperser.go (78%) rename tools/traffic/{test => workers}/mock_interceptable_ticker.go (84%) create mode 100644 tools/traffic/workers/mock_key_handler.go diff --git a/tools/traffic/test/mock_key_handler.go b/tools/traffic/test/mock_key_handler.go deleted file mode 100644 index 6cc15f301f..0000000000 --- a/tools/traffic/test/mock_key_handler.go +++ /dev/null @@ -1,41 +0,0 @@ -package test - -import ( - "github.com/Layr-Labs/eigenda/tools/traffic/workers" - "sync" - "testing" -) - -var _ workers.KeyHandler = (*mockKeyHandler)(nil) - -// mockKeyHandler is a stand-in for the blob verifier's UnconfirmedKeyHandler. -type mockKeyHandler struct { - t *testing.T - - ProvidedKey []byte - ProvidedChecksum [16]byte - ProvidedSize uint - - // Incremented each time AddUnconfirmedKey is called. - Count uint - - lock *sync.Mutex -} - -func newMockKeyHandler(t *testing.T, lock *sync.Mutex) *mockKeyHandler { - return &mockKeyHandler{ - t: t, - lock: lock, - } -} - -func (m *mockKeyHandler) AddUnconfirmedKey(key []byte, checksum [16]byte, size uint) { - m.lock.Lock() - defer m.lock.Unlock() - - m.ProvidedKey = key - m.ProvidedChecksum = checksum - m.ProvidedSize = size - - m.Count++ -} diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index 815ebe2781..39f770af24 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -11,6 +11,7 @@ import ( "github.com/Layr-Labs/eigenda/tools/traffic/metrics" "github.com/Layr-Labs/eigensdk-go/logging" "sync" + "time" ) // BlobWriter sends blobs to a disperser at a configured rate. @@ -24,9 +25,6 @@ type BlobWriter struct { // All logs should be written using this logger. logger logging.Logger - // ticker is used to control the rate at which blobs are sent to the disperser. - ticker InterceptableTicker - // Config contains the configuration for the generator. config *config2.WorkerConfig @@ -54,7 +52,6 @@ func NewBlobWriter( ctx *context.Context, waitGroup *sync.WaitGroup, logger logging.Logger, - ticker InterceptableTicker, config *config2.WorkerConfig, disperser clients.DisperserClient, unconfirmedKeyHandler KeyHandler, @@ -78,7 +75,6 @@ func NewBlobWriter( ctx: ctx, waitGroup: waitGroup, logger: logger, - ticker: ticker, config: config, disperser: disperser, unconfirmedKeyHandler: unconfirmedKeyHandler, @@ -92,25 +88,21 @@ func NewBlobWriter( // Start begins the blob writer goroutine. func (writer *BlobWriter) Start() { writer.waitGroup.Add(1) + ticker := time.NewTicker(writer.config.WriteRequestInterval) + go func() { - writer.run() + for { + select { + case <-(*writer.ctx).Done(): + return + case <-ticker.C: + writer.writeNextBlob() + } + } writer.waitGroup.Done() }() } -// run sends blobs to a disperser at a configured rate. -// Continues and dues not return until the context is cancelled. -func (writer *BlobWriter) run() { - for { - select { - case <-(*writer.ctx).Done(): - return - case <-writer.ticker.GetTimeChannel(): - writer.writeNextBlob() - } - } -} - // writeNextBlob attempts to send a random blob to the disperser. func (writer *BlobWriter) writeNextBlob() { data := writer.getRandomData() diff --git a/tools/traffic/test/blob_writer_test.go b/tools/traffic/workers/blob_writer_test.go similarity index 70% rename from tools/traffic/test/blob_writer_test.go rename to tools/traffic/workers/blob_writer_test.go index fb1b834a06..84139e79a4 100644 --- a/tools/traffic/test/blob_writer_test.go +++ b/tools/traffic/workers/blob_writer_test.go @@ -1,7 +1,6 @@ -package test +package workers import ( - "bytes" "context" "crypto/md5" "fmt" @@ -10,12 +9,10 @@ import ( "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" - "github.com/Layr-Labs/eigenda/tools/traffic/workers" "github.com/stretchr/testify/assert" "golang.org/x/exp/rand" "sync" "testing" - "time" ) func TestBlobWriter(t *testing.T) { @@ -25,8 +22,6 @@ func TestBlobWriter(t *testing.T) { waitGroup := sync.WaitGroup{} logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.Nil(t, err) - startTime := time.Unix(rand.Int63()%2_000_000_000, 0) - ticker := newMockTicker(startTime) dataSize := rand.Uint64()%1024 + 64 @@ -51,23 +46,19 @@ func TestBlobWriter(t *testing.T) { CustomQuorums: customQuorum, } - lock := sync.Mutex{} - - disperserClient := newMockDisperserClient(t, &lock, authenticated) - unconfirmedKeyHandler := newMockKeyHandler(t, &lock) + disperserClient := NewMockDisperserClient(t, authenticated) + unconfirmedKeyHandler := NewMockKeyHandler(t) generatorMetrics := metrics.NewMockMetrics() - writer := workers.NewBlobWriter( + writer := NewBlobWriter( &ctx, &waitGroup, logger, - ticker, config, disperserClient, unconfirmedKeyHandler, generatorMetrics) - writer.Start() errorProbability := 0.1 errorCount := 0 @@ -87,16 +78,11 @@ func TestBlobWriter(t *testing.T) { _, err = rand.Read(disperserClient.KeyToReturn) assert.Nil(t, err) - // Move time forward, allowing the writer to attempt to send a blob. - ticker.Tick(1 * time.Second) - - // Wait until the writer finishes its work. - tu.AssertEventuallyTrue(t, func() bool { - lock.Lock() - defer lock.Unlock() + // Simulate the advancement of time (i.e. allow the writer to write the next blob). + writer.writeNextBlob() - return uint(i+1) == disperserClient.DisperseCount && uint(i+1-errorCount) == unconfirmedKeyHandler.Count - }, time.Second) + assert.Equal(t, uint(i+1), disperserClient.DisperseCount) + assert.Equal(t, uint(i+1-errorCount), unconfirmedKeyHandler.Count) // This method should not be called in this test. assert.Equal(t, uint(0), disperserClient.GetStatusCount) @@ -113,12 +99,8 @@ func TestBlobWriter(t *testing.T) { assert.Equal(t, uint(len(disperserClient.ProvidedData)), unconfirmedKeyHandler.ProvidedSize) checksum := md5.Sum(disperserClient.ProvidedData) - tu.AssertEventuallyTrue(t, func() bool { - lock.Lock() - defer lock.Unlock() - return bytes.Equal(checksum[:], unconfirmedKeyHandler.ProvidedChecksum[:]) && - bytes.Equal(disperserClient.KeyToReturn, unconfirmedKeyHandler.ProvidedKey) - }, time.Second) + assert.Equal(t, checksum, unconfirmedKeyHandler.ProvidedChecksum) + assert.Equal(t, disperserClient.KeyToReturn, unconfirmedKeyHandler.ProvidedKey) //assert.Equal(t, checksum, unconfirmedKeyHandler.ProvidedChecksum) //assert.Equal(t, disperserClient.KeyToReturn, unconfirmedKeyHandler.ProvidedKey) @@ -137,14 +119,9 @@ func TestBlobWriter(t *testing.T) { } // Verify metrics. - tu.AssertEventuallyTrue(t, func() bool { - return float64(i+1) == generatorMetrics.GetCount("write_success") && - float64(errorCount) == generatorMetrics.GetCount("write_failure") - }, time.Second) + assert.Equal(t, float64(i+1-errorCount), generatorMetrics.GetCount("write_success")) + assert.Equal(t, float64(errorCount), generatorMetrics.GetCount("write_failure")) } cancel() - tu.ExecuteWithTimeout(func() { - waitGroup.Wait() - }, time.Second) } diff --git a/tools/traffic/test/mock_disperser.go b/tools/traffic/workers/mock_disperser.go similarity index 78% rename from tools/traffic/test/mock_disperser.go rename to tools/traffic/workers/mock_disperser.go index 44900e71cb..69b635777f 100644 --- a/tools/traffic/test/mock_disperser.go +++ b/tools/traffic/workers/mock_disperser.go @@ -1,4 +1,4 @@ -package test +package workers import ( "context" @@ -6,13 +6,12 @@ import ( disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" "github.com/Layr-Labs/eigenda/disperser" "github.com/stretchr/testify/assert" - "sync" "testing" ) -var _ clients.DisperserClient = (*mockDisperserClient)(nil) +var _ clients.DisperserClient = (*MockDisperserClient)(nil) -type mockDisperserClient struct { +type MockDisperserClient struct { t *testing.T // if true, DisperseBlobAuthenticated is expected to be used, otherwise DisperseBlob is expected to be used authenticated bool @@ -34,27 +33,21 @@ type mockDisperserClient struct { // Incremented each time GetBlobStatus is called. GetStatusCount uint - - lock *sync.Mutex } -func newMockDisperserClient(t *testing.T, lock *sync.Mutex, authenticated bool) *mockDisperserClient { - return &mockDisperserClient{ +func NewMockDisperserClient(t *testing.T, authenticated bool) *MockDisperserClient { + return &MockDisperserClient{ t: t, - lock: lock, authenticated: authenticated, StatusMap: make(map[string]disperser_rpc.BlobStatus), } } -func (m *mockDisperserClient) DisperseBlob( +func (m *MockDisperserClient) DisperseBlob( ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { - m.lock.Lock() - defer m.lock.Unlock() - assert.False(m.t, m.authenticated, "writer configured to use non-authenticated disperser method") m.ProvidedData = data m.ProvidedQuorum = customQuorums @@ -62,14 +55,11 @@ func (m *mockDisperserClient) DisperseBlob( return &m.StatusToReturn, m.KeyToReturn, m.DispenseErrorToReturn } -func (m *mockDisperserClient) DisperseBlobAuthenticated( +func (m *MockDisperserClient) DisperseBlobAuthenticated( ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { - m.lock.Lock() - defer m.lock.Unlock() - assert.True(m.t, m.authenticated, "writer configured to use authenticated disperser method") m.ProvidedData = data m.ProvidedQuorum = customQuorums @@ -77,10 +67,7 @@ func (m *mockDisperserClient) DisperseBlobAuthenticated( return &m.StatusToReturn, m.KeyToReturn, m.DispenseErrorToReturn } -func (m *mockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { - m.lock.Lock() - defer m.lock.Unlock() - +func (m *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { status := m.StatusMap[string(key)] m.GetStatusCount++ @@ -94,6 +81,6 @@ func (m *mockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*d }, nil } -func (m *mockDisperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) { +func (m *MockDisperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) { panic("this method should not be called by the generator utility") } diff --git a/tools/traffic/test/mock_interceptable_ticker.go b/tools/traffic/workers/mock_interceptable_ticker.go similarity index 84% rename from tools/traffic/test/mock_interceptable_ticker.go rename to tools/traffic/workers/mock_interceptable_ticker.go index 326847f041..106edae254 100644 --- a/tools/traffic/test/mock_interceptable_ticker.go +++ b/tools/traffic/workers/mock_interceptable_ticker.go @@ -1,11 +1,10 @@ -package test +package workers import ( - "github.com/Layr-Labs/eigenda/tools/traffic/workers" "time" ) -var _ workers.InterceptableTicker = (*mockTicker)(nil) +var _ InterceptableTicker = (*mockTicker)(nil) // mockTicker is a deterministic implementation of the InterceptableTicker interface. type mockTicker struct { diff --git a/tools/traffic/workers/mock_key_handler.go b/tools/traffic/workers/mock_key_handler.go new file mode 100644 index 0000000000..4ae520ee67 --- /dev/null +++ b/tools/traffic/workers/mock_key_handler.go @@ -0,0 +1,33 @@ +package workers + +import ( + "testing" +) + +var _ KeyHandler = (*MockKeyHandler)(nil) + +// MockKeyHandler is a stand-in for the blob verifier's UnconfirmedKeyHandler. +type MockKeyHandler struct { + t *testing.T + + ProvidedKey []byte + ProvidedChecksum [16]byte + ProvidedSize uint + + // Incremented each time AddUnconfirmedKey is called. + Count uint +} + +func NewMockKeyHandler(t *testing.T) *MockKeyHandler { + return &MockKeyHandler{ + t: t, + } +} + +func (m *MockKeyHandler) AddUnconfirmedKey(key []byte, checksum [16]byte, size uint) { + m.ProvidedKey = key + m.ProvidedChecksum = checksum + m.ProvidedSize = size + + m.Count++ +} From cd9da4d539ecd23184a7f2a18b05d4e0bd52d7f3 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 8 Aug 2024 09:11:35 -0500 Subject: [PATCH 05/10] Made more suggested changes. Signed-off-by: Cody Littley --- tools/traffic/workers/blob_writer_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tools/traffic/workers/blob_writer_test.go b/tools/traffic/workers/blob_writer_test.go index 84139e79a4..68aa3dbc54 100644 --- a/tools/traffic/workers/blob_writer_test.go +++ b/tools/traffic/workers/blob_writer_test.go @@ -60,13 +60,12 @@ func TestBlobWriter(t *testing.T) { unconfirmedKeyHandler, generatorMetrics) - errorProbability := 0.1 errorCount := 0 var previousData []byte for i := 0; i < 100; i++ { - if rand.Float64() < errorProbability { + if i%10 == 0 { disperserClient.DispenseErrorToReturn = fmt.Errorf("intentional error for testing purposes") errorCount++ } else { From 845477e412bbd38d412250afbc6ceb3b7e80677c Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Thu, 8 Aug 2024 09:13:26 -0500 Subject: [PATCH 06/10] lint Signed-off-by: Cody Littley --- tools/traffic/workers/blob_writer.go | 3 +- tools/traffic/workers/interceptable_ticker.go | 26 ---------------- .../workers/mock_interceptable_ticker.go | 31 ------------------- 3 files changed, 2 insertions(+), 58 deletions(-) delete mode 100644 tools/traffic/workers/interceptable_ticker.go delete mode 100644 tools/traffic/workers/mock_interceptable_ticker.go diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index 39f770af24..5cad8f2c1b 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -91,6 +91,8 @@ func (writer *BlobWriter) Start() { ticker := time.NewTicker(writer.config.WriteRequestInterval) go func() { + defer writer.waitGroup.Done() + for { select { case <-(*writer.ctx).Done(): @@ -99,7 +101,6 @@ func (writer *BlobWriter) Start() { writer.writeNextBlob() } } - writer.waitGroup.Done() }() } diff --git a/tools/traffic/workers/interceptable_ticker.go b/tools/traffic/workers/interceptable_ticker.go deleted file mode 100644 index 847f6acb25..0000000000 --- a/tools/traffic/workers/interceptable_ticker.go +++ /dev/null @@ -1,26 +0,0 @@ -package workers - -import "time" - -// InterceptableTicker is a wrapper around the time.Ticker struct. -// It allows for deterministic time passage to be simulated in tests. -type InterceptableTicker interface { - // getTimeChannel returns the channel that the ticker sends ticks on. Equivalent to time.Ticker.C. - GetTimeChannel() <-chan time.Time -} - -// standardTicker behaves exactly like a time.Ticker, for use in production code. -type standardTicker struct { - ticker *time.Ticker -} - -// NewTicker creates a new InterceptableTicker that behaves like a time.Ticker. -func NewTicker(d time.Duration) InterceptableTicker { - return &standardTicker{ - ticker: time.NewTicker(d), - } -} - -func (s *standardTicker) GetTimeChannel() <-chan time.Time { - return s.ticker.C -} diff --git a/tools/traffic/workers/mock_interceptable_ticker.go b/tools/traffic/workers/mock_interceptable_ticker.go deleted file mode 100644 index 106edae254..0000000000 --- a/tools/traffic/workers/mock_interceptable_ticker.go +++ /dev/null @@ -1,31 +0,0 @@ -package workers - -import ( - "time" -) - -var _ InterceptableTicker = (*mockTicker)(nil) - -// mockTicker is a deterministic implementation of the InterceptableTicker interface. -type mockTicker struct { - channel chan time.Time - now time.Time -} - -// newMockTicker creates a new InterceptableTicker that can be deterministically controlled in tests. -func newMockTicker(now time.Time) *mockTicker { - return &mockTicker{ - channel: make(chan time.Time), - now: now, - } -} - -func (m *mockTicker) GetTimeChannel() <-chan time.Time { - return m.channel -} - -// Tick advances the ticker by the specified duration. -func (m *mockTicker) Tick(elapsedTime time.Duration) { - m.now = m.now.Add(elapsedTime) - m.channel <- m.now -} From 94a2c29d1bc22df84cbeedcadac7516677a612b0 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Tue, 13 Aug 2024 15:01:43 -0500 Subject: [PATCH 07/10] Started refactoring unit test. Signed-off-by: Cody Littley --- tools/traffic/workers/blob_writer_test.go | 14 ++++++-------- tools/traffic/workers/mock_disperser.go | 11 +++++++++++ tools/traffic/workers/mock_key_handler.go | 17 ++++------------- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/tools/traffic/workers/blob_writer_test.go b/tools/traffic/workers/blob_writer_test.go index 68aa3dbc54..f8c84b8e5d 100644 --- a/tools/traffic/workers/blob_writer_test.go +++ b/tools/traffic/workers/blob_writer_test.go @@ -10,6 +10,7 @@ import ( "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "golang.org/x/exp/rand" "sync" "testing" @@ -47,7 +48,9 @@ func TestBlobWriter(t *testing.T) { } disperserClient := NewMockDisperserClient(t, authenticated) - unconfirmedKeyHandler := NewMockKeyHandler(t) + unconfirmedKeyHandler := &MockKeyHandler{} + unconfirmedKeyHandler.mock.On( + "AddUnconfirmedKey", mock.Anything, mock.Anything, mock.Anything).Return(nil) generatorMetrics := metrics.NewMockMetrics() @@ -81,7 +84,7 @@ func TestBlobWriter(t *testing.T) { writer.writeNextBlob() assert.Equal(t, uint(i+1), disperserClient.DisperseCount) - assert.Equal(t, uint(i+1-errorCount), unconfirmedKeyHandler.Count) + unconfirmedKeyHandler.mock.AssertNumberOfCalls(t, "AddUnconfirmedKey", i+1-errorCount) // This method should not be called in this test. assert.Equal(t, uint(0), disperserClient.GetStatusCount) @@ -95,14 +98,9 @@ func TestBlobWriter(t *testing.T) { assert.Equal(t, dataSize, uint64(len(decodedData))) // Verify that the proper data was sent to the unconfirmed key handler. - assert.Equal(t, uint(len(disperserClient.ProvidedData)), unconfirmedKeyHandler.ProvidedSize) checksum := md5.Sum(disperserClient.ProvidedData) - assert.Equal(t, checksum, unconfirmedKeyHandler.ProvidedChecksum) - assert.Equal(t, disperserClient.KeyToReturn, unconfirmedKeyHandler.ProvidedKey) - - //assert.Equal(t, checksum, unconfirmedKeyHandler.ProvidedChecksum) - //assert.Equal(t, disperserClient.KeyToReturn, unconfirmedKeyHandler.ProvidedKey) + unconfirmedKeyHandler.mock.AssertCalled(t, "AddUnconfirmedKey", disperserClient.KeyToReturn, checksum, uint(len(disperserClient.ProvidedData))) // Verify that data has the proper amount of randomness. if previousData != nil { diff --git a/tools/traffic/workers/mock_disperser.go b/tools/traffic/workers/mock_disperser.go index 69b635777f..25bc5e775d 100644 --- a/tools/traffic/workers/mock_disperser.go +++ b/tools/traffic/workers/mock_disperser.go @@ -6,12 +6,15 @@ import ( disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" "github.com/Layr-Labs/eigenda/disperser" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "testing" ) var _ clients.DisperserClient = (*MockDisperserClient)(nil) type MockDisperserClient struct { + mock mock.Mock + t *testing.T // if true, DisperseBlobAuthenticated is expected to be used, otherwise DisperseBlob is expected to be used authenticated bool @@ -48,6 +51,8 @@ func (m *MockDisperserClient) DisperseBlob( data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { + m.mock.Called(ctx, data, customQuorums) + assert.False(m.t, m.authenticated, "writer configured to use non-authenticated disperser method") m.ProvidedData = data m.ProvidedQuorum = customQuorums @@ -60,6 +65,8 @@ func (m *MockDisperserClient) DisperseBlobAuthenticated( data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { + m.mock.Called(ctx, data, customQuorums) + assert.True(m.t, m.authenticated, "writer configured to use authenticated disperser method") m.ProvidedData = data m.ProvidedQuorum = customQuorums @@ -68,6 +75,8 @@ func (m *MockDisperserClient) DisperseBlobAuthenticated( } func (m *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { + m.mock.Called(ctx, key) + status := m.StatusMap[string(key)] m.GetStatusCount++ @@ -82,5 +91,7 @@ func (m *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*d } func (m *MockDisperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) { + m.mock.Called(ctx, batchHeaderHash, blobIndex) + panic("this method should not be called by the generator utility") } diff --git a/tools/traffic/workers/mock_key_handler.go b/tools/traffic/workers/mock_key_handler.go index 4ae520ee67..2c48de995b 100644 --- a/tools/traffic/workers/mock_key_handler.go +++ b/tools/traffic/workers/mock_key_handler.go @@ -1,33 +1,24 @@ package workers import ( - "testing" + "github.com/stretchr/testify/mock" ) var _ KeyHandler = (*MockKeyHandler)(nil) // MockKeyHandler is a stand-in for the blob verifier's UnconfirmedKeyHandler. type MockKeyHandler struct { - t *testing.T + mock mock.Mock ProvidedKey []byte ProvidedChecksum [16]byte ProvidedSize uint - - // Incremented each time AddUnconfirmedKey is called. - Count uint -} - -func NewMockKeyHandler(t *testing.T) *MockKeyHandler { - return &MockKeyHandler{ - t: t, - } } func (m *MockKeyHandler) AddUnconfirmedKey(key []byte, checksum [16]byte, size uint) { + m.mock.Called(key, checksum, size) + m.ProvidedKey = key m.ProvidedChecksum = checksum m.ProvidedSize = size - - m.Count++ } From 38288fceb54807e4582d9051107bfb9bed9a2ea7 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Tue, 13 Aug 2024 15:55:06 -0500 Subject: [PATCH 08/10] Finsihed refactoring tests. Signed-off-by: Cody Littley --- tools/traffic/workers/blob_writer_test.go | 44 +++++++++------ tools/traffic/workers/mock_disperser.go | 69 +++-------------------- 2 files changed, 35 insertions(+), 78 deletions(-) diff --git a/tools/traffic/workers/blob_writer_test.go b/tools/traffic/workers/blob_writer_test.go index f8c84b8e5d..723894490a 100644 --- a/tools/traffic/workers/blob_writer_test.go +++ b/tools/traffic/workers/blob_writer_test.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/Layr-Labs/eigenda/common" tu "github.com/Layr-Labs/eigenda/common/testutils" + "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" @@ -31,6 +32,12 @@ func TestBlobWriter(t *testing.T) { if authenticated { signerPrivateKey = "asdf" } + var functionName string + if authenticated { + functionName = "DisperseBlobAuthenticated" + } else { + functionName = "DisperseBlob" + } randomizeBlobs := rand.Intn(2) == 0 @@ -47,7 +54,7 @@ func TestBlobWriter(t *testing.T) { CustomQuorums: customQuorum, } - disperserClient := NewMockDisperserClient(t, authenticated) + disperserClient := &MockDisperserClient{} unconfirmedKeyHandler := &MockKeyHandler{} unconfirmedKeyHandler.mock.On( "AddUnconfirmedKey", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -68,51 +75,54 @@ func TestBlobWriter(t *testing.T) { var previousData []byte for i := 0; i < 100; i++ { + var errorToReturn error if i%10 == 0 { - disperserClient.DispenseErrorToReturn = fmt.Errorf("intentional error for testing purposes") + errorToReturn = fmt.Errorf("intentional error for testing purposes") errorCount++ } else { - disperserClient.DispenseErrorToReturn = nil + errorToReturn = nil } // This is the key that will be assigned to the next blob. - disperserClient.KeyToReturn = make([]byte, 32) - _, err = rand.Read(disperserClient.KeyToReturn) + keyToReturn := make([]byte, 32) + _, err = rand.Read(keyToReturn) assert.Nil(t, err) + status := disperser.Processing + disperserClient.mock = mock.Mock{} // reset mock state + disperserClient.mock.On(functionName, mock.Anything, customQuorum).Return(&status, keyToReturn, errorToReturn) + // Simulate the advancement of time (i.e. allow the writer to write the next blob). writer.writeNextBlob() - assert.Equal(t, uint(i+1), disperserClient.DisperseCount) + disperserClient.mock.AssertNumberOfCalls(t, functionName, 1) unconfirmedKeyHandler.mock.AssertNumberOfCalls(t, "AddUnconfirmedKey", i+1-errorCount) - // This method should not be called in this test. - assert.Equal(t, uint(0), disperserClient.GetStatusCount) + if errorToReturn == nil { - if disperserClient.DispenseErrorToReturn == nil { - assert.NotNil(t, disperserClient.ProvidedData) - assert.Equal(t, customQuorum, disperserClient.ProvidedQuorum) + dataSentToDisperser := disperserClient.mock.Calls[0].Arguments.Get(0).([]byte) + assert.NotNil(t, dataSentToDisperser) // Strip away the extra encoding bytes. We should have data of the expected size. - decodedData := codec.RemoveEmptyByteFromPaddedBytes(disperserClient.ProvidedData) + decodedData := codec.RemoveEmptyByteFromPaddedBytes(dataSentToDisperser) assert.Equal(t, dataSize, uint64(len(decodedData))) // Verify that the proper data was sent to the unconfirmed key handler. - checksum := md5.Sum(disperserClient.ProvidedData) + checksum := md5.Sum(dataSentToDisperser) - unconfirmedKeyHandler.mock.AssertCalled(t, "AddUnconfirmedKey", disperserClient.KeyToReturn, checksum, uint(len(disperserClient.ProvidedData))) + unconfirmedKeyHandler.mock.AssertCalled(t, "AddUnconfirmedKey", keyToReturn, checksum, uint(len(dataSentToDisperser))) // Verify that data has the proper amount of randomness. if previousData != nil { if randomizeBlobs { // We expect each blob to be different. - assert.NotEqual(t, previousData, disperserClient.ProvidedData) + assert.NotEqual(t, previousData, dataSentToDisperser) } else { // We expect each blob to be the same. - assert.Equal(t, previousData, disperserClient.ProvidedData) + assert.Equal(t, previousData, dataSentToDisperser) } } - previousData = disperserClient.ProvidedData + previousData = dataSentToDisperser } // Verify metrics. diff --git a/tools/traffic/workers/mock_disperser.go b/tools/traffic/workers/mock_disperser.go index 25bc5e775d..ba1b013880 100644 --- a/tools/traffic/workers/mock_disperser.go +++ b/tools/traffic/workers/mock_disperser.go @@ -5,45 +5,13 @@ import ( "github.com/Layr-Labs/eigenda/api/clients" disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" "github.com/Layr-Labs/eigenda/disperser" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "testing" ) var _ clients.DisperserClient = (*MockDisperserClient)(nil) type MockDisperserClient struct { mock mock.Mock - - t *testing.T - // if true, DisperseBlobAuthenticated is expected to be used, otherwise DisperseBlob is expected to be used - authenticated bool - - // The next status, key, and error to return from DisperseBlob or DisperseBlobAuthenticated - StatusToReturn disperser.BlobStatus - KeyToReturn []byte - DispenseErrorToReturn error - - // The previous values passed to DisperseBlob or DisperseBlobAuthenticated - ProvidedData []byte - ProvidedQuorum []uint8 - - // Incremented each time DisperseBlob or DisperseBlobAuthenticated is called. - DisperseCount uint - - // A map from key (in string form) to the status to return from GetBlobStatus. If nil, then an error is returned. - StatusMap map[string]disperser_rpc.BlobStatus - - // Incremented each time GetBlobStatus is called. - GetStatusCount uint -} - -func NewMockDisperserClient(t *testing.T, authenticated bool) *MockDisperserClient { - return &MockDisperserClient{ - t: t, - authenticated: authenticated, - StatusMap: make(map[string]disperser_rpc.BlobStatus), - } } func (m *MockDisperserClient) DisperseBlob( @@ -51,13 +19,9 @@ func (m *MockDisperserClient) DisperseBlob( data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { - m.mock.Called(ctx, data, customQuorums) + args := m.mock.Called(data, customQuorums) - assert.False(m.t, m.authenticated, "writer configured to use non-authenticated disperser method") - m.ProvidedData = data - m.ProvidedQuorum = customQuorums - m.DisperseCount++ - return &m.StatusToReturn, m.KeyToReturn, m.DispenseErrorToReturn + return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2) } func (m *MockDisperserClient) DisperseBlobAuthenticated( @@ -65,33 +29,16 @@ func (m *MockDisperserClient) DisperseBlobAuthenticated( data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { - m.mock.Called(ctx, data, customQuorums) - - assert.True(m.t, m.authenticated, "writer configured to use authenticated disperser method") - m.ProvidedData = data - m.ProvidedQuorum = customQuorums - m.DisperseCount++ - return &m.StatusToReturn, m.KeyToReturn, m.DispenseErrorToReturn + args := m.mock.Called(data, customQuorums) + return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2) } func (m *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { - m.mock.Called(ctx, key) - - status := m.StatusMap[string(key)] - m.GetStatusCount++ - - return &disperser_rpc.BlobStatusReply{ - Status: status, - Info: &disperser_rpc.BlobInfo{ - BlobVerificationProof: &disperser_rpc.BlobVerificationProof{ - BatchMetadata: &disperser_rpc.BatchMetadata{}, - }, - }, - }, nil + args := m.mock.Called(key) + return args.Get(0).(*disperser_rpc.BlobStatusReply), args.Error(1) } func (m *MockDisperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) { - m.mock.Called(ctx, batchHeaderHash, blobIndex) - - panic("this method should not be called by the generator utility") + args := m.mock.Called(batchHeaderHash, blobIndex) + return args.Get(0).([]byte), args.Error(1) } From 656294e579cff4fcf0699c2061c2b4a45ecb036e Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Wed, 14 Aug 2024 07:51:54 -0500 Subject: [PATCH 09/10] Made suggested changes. Signed-off-by: Cody Littley --- tools/traffic/workers/blob_writer.go | 38 +++++++++++++--------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index 5cad8f2c1b..b2cefe17de 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -35,7 +35,7 @@ type BlobWriter struct { unconfirmedKeyHandler KeyHandler // fixedRandomData contains random data for blobs if RandomizeBlobs is false, and nil otherwise. - fixedRandomData *[]byte + fixedRandomData []byte // writeLatencyMetric is used to record latency for write requests. writeLatencyMetric metrics.LatencyMetric @@ -78,7 +78,7 @@ func NewBlobWriter( config: config, disperser: disperser, unconfirmedKeyHandler: unconfirmedKeyHandler, - fixedRandomData: &fixedRandomData, + fixedRandomData: fixedRandomData, writeLatencyMetric: generatorMetrics.NewLatencyMetric("write"), writeSuccessMetric: generatorMetrics.NewCountMetric("write_success"), writeFailureMetric: generatorMetrics.NewCountMetric("write_failure"), @@ -106,46 +106,47 @@ func (writer *BlobWriter) Start() { // writeNextBlob attempts to send a random blob to the disperser. func (writer *BlobWriter) writeNextBlob() { - data := writer.getRandomData() + data, err := writer.getRandomData() + if err != nil { + writer.logger.Error("failed to get random data", "err", err) + return + } key, err := metrics.InvokeAndReportLatency(writer.writeLatencyMetric, func() ([]byte, error) { - return writer.sendRequest(*data) + return writer.sendRequest(data) }) if err != nil { writer.writeFailureMetric.Increment() - writer.logger.Error("failed to send blob request", "err:", err) + writer.logger.Error("failed to send blob request", "err", err) return } writer.writeSuccessMetric.Increment() - checksum := md5.Sum(*data) - writer.unconfirmedKeyHandler.AddUnconfirmedKey(key, checksum, uint(len(*data))) + checksum := md5.Sum(data) + writer.unconfirmedKeyHandler.AddUnconfirmedKey(key, checksum, uint(len(data))) } // getRandomData returns a slice of random data to be used for a blob. -func (writer *BlobWriter) getRandomData() *[]byte { - if *writer.fixedRandomData != nil { - return writer.fixedRandomData +func (writer *BlobWriter) getRandomData() ([]byte, error) { + if writer.fixedRandomData != nil { + return writer.fixedRandomData, nil } data := make([]byte, writer.config.DataSize) _, err := rand.Read(data) if err != nil { - panic(fmt.Sprintf("unable to read random data: %s", err)) + return nil, fmt.Errorf("unable to read random data: %w", err) } data = codec.ConvertByPaddingEmptyByte(data) - return &data + return data, nil } // sendRequest sends a blob to a disperser. -func (writer *BlobWriter) sendRequest(data []byte) ([]byte /* key */, error) { - +func (writer *BlobWriter) sendRequest(data []byte) (key []byte, err error) { ctxTimeout, cancel := context.WithTimeout(*writer.ctx, writer.config.WriteTimeout) defer cancel() - var key []byte - var err error if writer.config.SignerPrivateKey != "" { _, key, err = writer.disperser.DisperseBlobAuthenticated( ctxTimeout, @@ -157,8 +158,5 @@ func (writer *BlobWriter) sendRequest(data []byte) ([]byte /* key */, error) { data, writer.config.CustomQuorums) } - if err != nil { - return nil, err - } - return key, nil + return } From deea71bdaef0675cb8f1ba12ab6da3d9289c3975 Mon Sep 17 00:00:00 2001 From: Cody Littley Date: Fri, 16 Aug 2024 08:43:03 -0500 Subject: [PATCH 10/10] Made suggested changes. Signed-off-by: Cody Littley --- tools/traffic/workers/blob_writer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index b2cefe17de..a30a7e5bdd 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -7,7 +7,7 @@ import ( "fmt" "github.com/Layr-Labs/eigenda/api/clients" "github.com/Layr-Labs/eigenda/encoding/utils/codec" - config2 "github.com/Layr-Labs/eigenda/tools/traffic/config" + "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" "github.com/Layr-Labs/eigensdk-go/logging" "sync" @@ -26,7 +26,7 @@ type BlobWriter struct { logger logging.Logger // Config contains the configuration for the generator. - config *config2.WorkerConfig + config *config.WorkerConfig // disperser is the client used to send blobs to the disperser. disperser clients.DisperserClient @@ -52,7 +52,7 @@ func NewBlobWriter( ctx *context.Context, waitGroup *sync.WaitGroup, logger logging.Logger, - config *config2.WorkerConfig, + config *config.WorkerConfig, disperser clients.DisperserClient, unconfirmedKeyHandler KeyHandler, generatorMetrics metrics.Metrics) BlobWriter {