-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Split blob writer code out of larger PR. #685
Changes from 1 commit
2268e52
72b57ba
6c862c2
1508a26
cd9da4d
845477e
94a2c29
38288fc
656294e
deea71b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
package test | ||
|
||
import ( | ||
"context" | ||
"crypto/md5" | ||
"fmt" | ||
"github.com/Layr-Labs/eigenda/common" | ||
tu "github.com/Layr-Labs/eigenda/common/testutils" | ||
"github.com/Layr-Labs/eigenda/encoding/utils/codec" | ||
"github.com/Layr-Labs/eigenda/tools/traffic/config" | ||
"github.com/Layr-Labs/eigenda/tools/traffic/metrics" | ||
"github.com/Layr-Labs/eigenda/tools/traffic/workers" | ||
"github.com/stretchr/testify/assert" | ||
"golang.org/x/exp/rand" | ||
"sync" | ||
"testing" | ||
"time" | ||
) | ||
|
||
func TestBlobWriter(t *testing.T) { | ||
tu.InitializeRandom() | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
waitGroup := sync.WaitGroup{} | ||
logger, err := common.NewLogger(common.DefaultLoggerConfig()) | ||
assert.Nil(t, err) | ||
startTime := time.Unix(rand.Int63()%2_000_000_000, 0) | ||
ticker := newMockTicker(startTime) | ||
|
||
dataSize := rand.Uint64()%1024 + 64 | ||
|
||
authenticated := rand.Intn(2) == 0 | ||
var signerPrivateKey string | ||
if authenticated { | ||
signerPrivateKey = "asdf" | ||
} | ||
|
||
randomizeBlobs := rand.Intn(2) == 0 | ||
|
||
useCustomQuorum := rand.Intn(2) == 0 | ||
var customQuorum []uint8 | ||
if useCustomQuorum { | ||
customQuorum = []uint8{1, 2, 3} | ||
} | ||
|
||
config := &config.WorkerConfig{ | ||
DataSize: dataSize, | ||
SignerPrivateKey: signerPrivateKey, | ||
RandomizeBlobs: randomizeBlobs, | ||
CustomQuorums: customQuorum, | ||
} | ||
|
||
lock := sync.Mutex{} | ||
|
||
disperserClient := newMockDisperserClient(t, &lock, authenticated) | ||
unconfirmedKeyHandler := newMockKeyHandler(t, &lock) | ||
|
||
generatorMetrics := metrics.NewMockMetrics() | ||
|
||
writer := workers.NewBlobWriter( | ||
&ctx, | ||
&waitGroup, | ||
logger, | ||
ticker, | ||
config, | ||
disperserClient, | ||
unconfirmedKeyHandler, | ||
generatorMetrics) | ||
writer.Start() | ||
|
||
errorProbability := 0.1 | ||
errorCount := 0 | ||
|
||
var previousData []byte | ||
|
||
for i := 0; i < 100; i++ { | ||
if rand.Float64() < errorProbability { | ||
disperserClient.DispenseErrorToReturn = fmt.Errorf("intentional error for testing purposes") | ||
errorCount++ | ||
} else { | ||
disperserClient.DispenseErrorToReturn = nil | ||
} | ||
|
||
// This is the key that will be assigned to the next blob. | ||
disperserClient.KeyToReturn = make([]byte, 32) | ||
_, err = rand.Read(disperserClient.KeyToReturn) | ||
assert.Nil(t, err) | ||
|
||
// Move time forward, allowing the writer to attempt to send a blob. | ||
ticker.Tick(1 * time.Second) | ||
|
||
// Wait until the writer finishes its work. | ||
tu.AssertEventuallyTrue(t, func() bool { | ||
lock.Lock() | ||
defer lock.Unlock() | ||
return int(disperserClient.DisperseCount) > i && int(unconfirmedKeyHandler.Count)+errorCount > i | ||
}, time.Second) | ||
|
||
// These methods should be called exactly once per tick if there are no errors. | ||
// In the presence of errors, nothing should be passed to the unconfirmed key handler. | ||
assert.Equal(t, uint(i+1), disperserClient.DisperseCount) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can also directly match if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've switched over to your recommended style for mock objects. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Timing/synchronizing with another goroutine might be tricky just using time. Does this reliably pass? How do you know if L101 will execute after disperser request has been made? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Empirically, this was reliably passing (ran it in a loop a few thousand times). Primary reason why I was going through all the trouble of doing it in an attempt to put the tests in another directory without making methods public that shouldn't be public. I understand your general concern though. I refactored this so that I don't rely on timing. As a result, the test files will end up being in the same package as the other files. Ugly to me, but it seems like a fairly normal practice in go. |
||
assert.Equal(t, uint(i+1-errorCount), unconfirmedKeyHandler.Count) | ||
|
||
// This method should not be called in this test. | ||
assert.Equal(t, uint(0), disperserClient.GetStatusCount) | ||
|
||
if disperserClient.DispenseErrorToReturn == nil { | ||
assert.NotNil(t, disperserClient.ProvidedData) | ||
assert.Equal(t, customQuorum, disperserClient.ProvidedQuorum) | ||
|
||
// Strip away the extra encoding bytes. We should have data of the expected size. | ||
decodedData := codec.RemoveEmptyByteFromPaddedBytes(disperserClient.ProvidedData) | ||
assert.Equal(t, dataSize, uint64(len(decodedData))) | ||
|
||
// Verify that the proper data was sent to the unconfirmed key handler. | ||
assert.Equal(t, uint(len(disperserClient.ProvidedData)), unconfirmedKeyHandler.ProvidedSize) | ||
checksum := md5.Sum(disperserClient.ProvidedData) | ||
assert.Equal(t, checksum, unconfirmedKeyHandler.ProvidedChecksum) | ||
assert.Equal(t, disperserClient.KeyToReturn, unconfirmedKeyHandler.ProvidedKey) | ||
|
||
// Verify that data has the proper amount of randomness. | ||
if previousData != nil { | ||
if randomizeBlobs { | ||
// We expect each blob to be different. | ||
assert.NotEqual(t, previousData, disperserClient.ProvidedData) | ||
} else { | ||
// We expect each blob to be the same. | ||
assert.Equal(t, previousData, disperserClient.ProvidedData) | ||
} | ||
} | ||
previousData = disperserClient.ProvidedData | ||
} | ||
|
||
// Verify metrics. | ||
assert.Equal(t, float64(i+1-errorCount), generatorMetrics.GetCount("write_success")) | ||
assert.Equal(t, float64(errorCount), generatorMetrics.GetCount("write_failure")) | ||
} | ||
|
||
cancel() | ||
tu.ExecuteWithTimeout(func() { | ||
waitGroup.Wait() | ||
}, time.Second) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
package test | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For mocks, you can enforce they adhere to some interface by doing something like this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Neat trick! I actually noticed a few similar lines in other files and was confused by them. |
||
|
||
import ( | ||
"context" | ||
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" | ||
"github.com/Layr-Labs/eigenda/disperser" | ||
"github.com/stretchr/testify/assert" | ||
"sync" | ||
"testing" | ||
) | ||
|
||
type mockDisperserClient struct { | ||
t *testing.T | ||
// if true, DisperseBlobAuthenticated is expected to be used, otherwise DisperseBlob is expected to be used | ||
authenticated bool | ||
|
||
// The next status, key, and error to return from DisperseBlob or DisperseBlobAuthenticated | ||
StatusToReturn disperser.BlobStatus | ||
KeyToReturn []byte | ||
DispenseErrorToReturn error | ||
|
||
// The previous values passed to DisperseBlob or DisperseBlobAuthenticated | ||
ProvidedData []byte | ||
ProvidedQuorum []uint8 | ||
|
||
// Incremented each time DisperseBlob or DisperseBlobAuthenticated is called. | ||
DisperseCount uint | ||
|
||
// A map from key (in string form) to the status to return from GetBlobStatus. If nil, then an error is returned. | ||
StatusMap map[string]disperser_rpc.BlobStatus | ||
|
||
// Incremented each time GetBlobStatus is called. | ||
GetStatusCount uint | ||
|
||
lock *sync.Mutex | ||
} | ||
|
||
func newMockDisperserClient(t *testing.T, lock *sync.Mutex, authenticated bool) *mockDisperserClient { | ||
return &mockDisperserClient{ | ||
t: t, | ||
lock: lock, | ||
authenticated: authenticated, | ||
StatusMap: make(map[string]disperser_rpc.BlobStatus), | ||
} | ||
} | ||
|
||
func (m *mockDisperserClient) DisperseBlob( | ||
ctx context.Context, | ||
data []byte, | ||
customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { | ||
|
||
m.lock.Lock() | ||
defer m.lock.Unlock() | ||
|
||
assert.False(m.t, m.authenticated, "writer configured to use non-authenticated disperser method") | ||
m.ProvidedData = data | ||
m.ProvidedQuorum = customQuorums | ||
m.DisperseCount++ | ||
return &m.StatusToReturn, m.KeyToReturn, m.DispenseErrorToReturn | ||
} | ||
|
||
func (m *mockDisperserClient) DisperseBlobAuthenticated( | ||
ctx context.Context, | ||
data []byte, | ||
customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { | ||
|
||
m.lock.Lock() | ||
defer m.lock.Unlock() | ||
|
||
assert.True(m.t, m.authenticated, "writer configured to use authenticated disperser method") | ||
m.ProvidedData = data | ||
m.ProvidedQuorum = customQuorums | ||
m.DisperseCount++ | ||
return &m.StatusToReturn, m.KeyToReturn, m.DispenseErrorToReturn | ||
} | ||
|
||
func (m *mockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { | ||
m.lock.Lock() | ||
defer m.lock.Unlock() | ||
|
||
status := m.StatusMap[string(key)] | ||
m.GetStatusCount++ | ||
|
||
return &disperser_rpc.BlobStatusReply{ | ||
Status: status, | ||
Info: &disperser_rpc.BlobInfo{ | ||
BlobVerificationProof: &disperser_rpc.BlobVerificationProof{ | ||
BatchMetadata: &disperser_rpc.BatchMetadata{}, | ||
}, | ||
}, | ||
}, nil | ||
} | ||
|
||
func (m *mockDisperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) { | ||
panic("this method should not be called by the generator utility") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package test | ||
|
||
import "time" | ||
|
||
// mockTicker is a deterministic implementation of the InterceptableTicker interface. | ||
type mockTicker struct { | ||
channel chan time.Time | ||
now time.Time | ||
} | ||
|
||
// newMockTicker creates a new InterceptableTicker that can be deterministically controlled in tests. | ||
func newMockTicker(now time.Time) *mockTicker { | ||
return &mockTicker{ | ||
channel: make(chan time.Time), | ||
now: now, | ||
} | ||
} | ||
|
||
func (m *mockTicker) GetTimeChannel() <-chan time.Time { | ||
return m.channel | ||
} | ||
|
||
// Tick advances the ticker by the specified duration. | ||
func (m *mockTicker) Tick(elapsedTime time.Duration) { | ||
m.now = m.now.Add(elapsedTime) | ||
m.channel <- m.now | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package test | ||
|
||
import ( | ||
"sync" | ||
"testing" | ||
) | ||
|
||
// mockKeyHandler is a stand-in for the blob verifier's UnconfirmedKeyHandler. | ||
type mockKeyHandler struct { | ||
t *testing.T | ||
|
||
ProvidedKey []byte | ||
ProvidedChecksum [16]byte | ||
ProvidedSize uint | ||
|
||
// Incremented each time AddUnconfirmedKey is called. | ||
Count uint | ||
|
||
lock *sync.Mutex | ||
} | ||
|
||
func newMockKeyHandler(t *testing.T, lock *sync.Mutex) *mockKeyHandler { | ||
return &mockKeyHandler{ | ||
t: t, | ||
lock: lock, | ||
} | ||
} | ||
|
||
func (m *mockKeyHandler) AddUnconfirmedKey(key []byte, checksum [16]byte, size uint) { | ||
m.lock.Lock() | ||
defer m.lock.Unlock() | ||
|
||
m.ProvidedKey = key | ||
m.ProvidedChecksum = checksum | ||
m.ProvidedSize = size | ||
|
||
m.Count++ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of testing the error cases probabilistically, why don't we have an explicit test case that tests the failures?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this to non-random, it now generates an error every tenth cycle:
if i%10 == 0
. Would you still like to se a stand alone unit test for the error case, or is this sufficient?