-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[payments] disperser server metering #792
Conversation
9755e83
to
0758b5d
Compare
1bd1029
to
d402849
Compare
0758b5d
to
6972c65
Compare
b72877e
to
b085195
Compare
24cc85d
to
256cd70
Compare
eb7a1a3
to
80ec710
Compare
c81aad0
to
5c33742
Compare
core/auth/payment_signer.go
Outdated
|
||
func (s *PaymentSigner) SignBlobPayment(header *commonpb.PaymentHeader) ([]byte, error) { | ||
// Set the account id to the hex encoded public key of the signer | ||
header.AccountId = hex.EncodeToString(crypto.FromECDSAPub(&s.PrivateKey.PublicKey)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is AccountID the public key or the eth account address?
I think there has been some confusion with how we authenticate today. Let's make sure it's clear what we're using (preferably eth account address)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was intended to be the public key, but I've updated it to use the Eth account address
core/data.go
Outdated
return crypto.Keccak256Hash(data) | ||
} | ||
|
||
// Hash returns the Keccak256 hash of the PaymentMetadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConvertPaymentHeader
converts...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks 😅 updated
core/meterer/meterer.go
Outdated
@@ -138,9 +130,10 @@ func (m *Meterer) ValidateQuorum(headerQuorums []uint8, allowedQuorums []uint8) | |||
// ValidateBinIndex checks if the provided bin index is valid | |||
func (m *Meterer) ValidateBinIndex(header core.PaymentMetadata, reservation *core.ActiveReservation) bool { | |||
now := uint64(time.Now().Unix()) | |||
currentBinIndex := GetBinIndex(now, m.ReservationWindow) | |||
reservationWindow := m.ChainState.GetReservationWindow() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it make onchain query every time? should we cache these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't expect this to make onchain query every time. ChainState
tracks a PaymentVaultParams
struct that caches all the contract constants so this should query directly from the cache (function) and I expect onchain reads only when 1) initializing ChainState struct
and 2) refreshed periodically along with registered payments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there some kind of retry on failure mechanism?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nada, although I don't think a regular read from cached struct will have any failure. There's periodic polling against the on-chain contract to refresh the state, so if one poll fails, the next poll will be the next try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol thought ChainState
was this ChainState. Should we just call this OnchainPaymentState
or sth?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep we could do that! updated so it is more clear:)
disperser/apiserver/server.go
Outdated
} | ||
|
||
// dispersePaidBlob checks for payment metering, otherwise does the same thing as disperseBlob | ||
func (s *DispersalServer) dispersePaidBlob(ctx context.Context, blob *core.Blob, quorumNumbers []uint32, binIndex uint32, cumulativePayment *big.Int, signature []byte, authenticatedAddress string, apiMethodName string) (*pb.DisperseBlobReply, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like its mostly copied from disperseBlob
except for the s.meterer != nil
block. Why don't we integrate it into the existing method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't dare mess around disperseBlob
😬 and wanted to keep the two flows cleanly separated from each other. Additionally, dispersePaidBlob
has these extra params quorumNumbers []uint32, binIndex uint32, cumulativePayment *big.Int, signature []byte
.
To make the code cleaner, I could add a PaymentMetadata
param to disperseBlob
and pass in an empty one when calling from DisperseBlob
and construct a real one in DispersePaidBlob
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make the code cleaner, I could add a PaymentMetadata param to disperseBlob and pass in an empty one when calling from DisperseBlob and construct a real one in DispersePaidBlob. What do you think?
I'd be in favor of something like this to reduce code duplication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated! removed dispersePaidBlob
and added PaymentMetadata
to disperseBlob
disperser/apiserver/server.go
Outdated
s.logger.Debug("received a new paid blob dispersal request", "authenticatedAddress", authenticatedAddress, "origin", origin, "blobSizeBytes", blobSize, "securityParams", strings.Join(securityParamsStrings, ", ")) | ||
|
||
// payments before ratelimits | ||
if s.meterer != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does it interact with rate limiter? Would there ever be a case where request is limited by both meterer and rate limiter? i.e. is it possible that there's been sufficient amount paid, but is above the rate limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There won't ever be a case where request is limited by both meterer and rate limiter. If meterer is configured, rate limiter should not be used.
Do we want a different type of behavior? Like, using meterer if there's PaymentMetadata
, rate limiter if not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about something simple like this?
- If the payment metadata is included, we use the meterer. If the meterer is unavailable, we fail.
- If the payment metadata is not included, we use the rate limiter. If rate limiter is unavailable, pass (current behavior)
The DisperseBlobPaid method will always pass the payment metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great, now disperseBlob
will check for payment metadata to use metering, and then ratelimiter if it exists, otherwise continues as usual
seenQuorums := make(map[uint8]struct{}) | ||
|
||
// TODO: validate payment signature against payment metadata | ||
if !auth.VerifyPaymentSignature(req.GetPaymentHeader(), req.GetPaymentSignature()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. The method looks exactly same as validateRequestAndGetBlob
except this check. Could we extend the existing method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem here was the two request types are different and we expect different quorum requirements (reservation doesn't need to be using required quorums, on-demand is automatically assumed to be using required quorums). I was thinking of making the checks in the existing method more modular but didn't want to mess around much... I would propose we can keep this new method and refactor the existing checks. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg, lets add a TODO here
6f8db25
to
9ed0ed8
Compare
c5006a4
to
4f66eea
Compare
disperser/cmd/apiserver/main.go
Outdated
|
||
offchainStore, err := mt.NewOffchainStore( | ||
config.AwsClientConfig, | ||
"reservations", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these switched to constants in a future PR by chance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, I updated to use config flag (optional with defaults)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should these be read from config?
disperser/apiserver/server_test.go
Outdated
PricePerSymbol: 1, | ||
MinNumSymbols: 1, | ||
GlobalSymbolsPerSecond: 1000, | ||
ReservationWindow: 60, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outdated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, rebased just now
core/meterer/meterer.go
Outdated
@@ -138,9 +130,10 @@ func (m *Meterer) ValidateQuorum(headerQuorums []uint8, allowedQuorums []uint8) | |||
// ValidateBinIndex checks if the provided bin index is valid | |||
func (m *Meterer) ValidateBinIndex(header core.PaymentMetadata, reservation *core.ActiveReservation) bool { | |||
now := uint64(time.Now().Unix()) | |||
currentBinIndex := GetBinIndex(now, m.ReservationWindow) | |||
reservationWindow := m.ChainState.GetReservationWindow() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there some kind of retry on failure mechanism?
d4dc32b
to
e5c113a
Compare
disperser/apiserver/server.go
Outdated
@@ -209,7 +214,7 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse | |||
} | |||
|
|||
// Disperse the blob | |||
reply, err := s.disperseBlob(ctx, blob, authenticatedAddress, "DisperseBlobAuthenticated") | |||
reply, err := s.disperseBlob(ctx, blob, authenticatedAddress, "DisperseBlobAuthenticated", &core.PaymentMetadata{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just pass in nil
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, updated!
disperser/apiserver/server.go
Outdated
@@ -282,7 +283,13 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut | |||
|
|||
s.logger.Debug("received a new blob dispersal request", "authenticatedAddress", authenticatedAddress, "origin", origin, "blobSizeBytes", blobSize, "securityParams", strings.Join(securityParamsStrings, ", ")) | |||
|
|||
if s.ratelimiter != nil { | |||
// If paymentHeader is not empty, we use the meterer, otherwise we use the ratelimiter if the ratelimiter is available | |||
if paymentHeader != nil && *paymentHeader != (core.PaymentMetadata{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably just check for nil and pass in nil for the non-paid endpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
2ec7396
to
d603716
Compare
if s.ratelimiter != nil { | ||
// If paymentHeader is not empty, we use the meterer, otherwise we use the ratelimiter if the ratelimiter is available | ||
if paymentHeader != nil { | ||
err := s.meterer.MeterRequest(ctx, *blob, *paymentHeader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The blob length is passed in in the next PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, all the changes regarding to our v1/v2 payment discussions were made in the next PR along with the integration test that initiated that discussion. I could repeat the changes here again, thinking it might make intuitive sense in the review process
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was mostly just a note for myself. I think it's fine as is!
@@ -311,6 +318,38 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut | |||
}, nil | |||
} | |||
|
|||
func (s *DispersalServer) DispersePaidBlob(ctx context.Context, req *pb.DispersePaidBlobRequest) (*pb.DisperseBlobReply, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we put this code behind a feature flag? Right now, if we were to release from this code, we'd be running code here which is still incomplete. (The previous PR hits the unimplemented fallback)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 like if paymentFeatureEnabled
do this, otherwise hit unimplemented fallback? the main incomplete part is the contract, could we wait til code complete to make a release?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could a feature flag be useful for rollout plan? I'd imagine we might want to test this on preprod and testnet before mainnet? cc @ian-shim
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! This should be behind a feature flag. Maybe we can use EnablePaymentMeterer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good to me:) I added a check with if meterer
is nil then the function fallback. This is because meterer is only set when EnablePaymentMeterer
is true, and wanted to avoid adding additional config to DisperserServer
c317303
to
928ab5e
Compare
core/meterer/meterer.go
Outdated
go func() { | ||
ticker := time.NewTicker(1 * time.Hour) | ||
ticker := time.NewTicker(updateInterval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would updateInterval
be ever different from m.UpdateInterval
? Should we just use m.UpdateInterval
instead of passing it in as a param?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for pointing it out! updated
Why are these changes needed?
Disperser server integrated with payments metering
Checks