-
Notifications
You must be signed in to change notification settings - Fork 58
/
Copy pathrevalidator.go
372 lines (316 loc) · 12.8 KB
/
revalidator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
package requestvalidation
import (
"context"
"errors"
"sync"
logging "github.com/ipfs/go-log/v2"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/migrations"
)
var log = logging.Logger("markets-rtvl-reval")
// RevalidatorEnvironment are the dependencies needed to
// build the logic of revalidation -- essentially, access to the node at statemachines
type RevalidatorEnvironment interface {
Node() rm.RetrievalProviderNode
SendEvent(dealID rm.ProviderDealIdentifier, evt rm.ProviderEvent, args ...interface{}) error
Get(dealID rm.ProviderDealIdentifier) (rm.ProviderDealState, error)
}
type channelData struct {
dealID rm.ProviderDealIdentifier
totalSent uint64
totalPaidFor uint64
interval uint64
pricePerByte abi.TokenAmount
reload bool
legacyProtocol bool
}
// ProviderRevalidator defines data transfer revalidation logic in the context of
// a provider for a retrieval deal
type ProviderRevalidator struct {
env RevalidatorEnvironment
trackedChannelsLk sync.RWMutex
trackedChannels map[datatransfer.ChannelID]*channelData
}
// NewProviderRevalidator returns a new instance of a ProviderRevalidator
func NewProviderRevalidator(env RevalidatorEnvironment) *ProviderRevalidator {
return &ProviderRevalidator{
env: env,
trackedChannels: make(map[datatransfer.ChannelID]*channelData),
}
}
// TrackChannel indicates a retrieval deal tracked by this provider. It associates
// a given channel ID with a retrieval deal, so that checks run for data sent
// on the channel
func (pr *ProviderRevalidator) TrackChannel(deal rm.ProviderDealState) {
if deal.ChannelID == nil {
return
}
pr.trackedChannelsLk.Lock()
defer pr.trackedChannelsLk.Unlock()
pr.trackedChannels[*deal.ChannelID] = &channelData{
dealID: deal.Identifier(),
}
pr.writeDealState(deal)
}
// UntrackChannel indicates a retrieval deal is finish and no longer is tracked
// by this provider
func (pr *ProviderRevalidator) UntrackChannel(deal rm.ProviderDealState) {
// Sanity check
if deal.ChannelID == nil {
log.Errorf("cannot untrack deal %s: channel ID is nil", deal.ID)
return
}
pr.trackedChannelsLk.Lock()
defer pr.trackedChannelsLk.Unlock()
delete(pr.trackedChannels, *deal.ChannelID)
}
func (pr *ProviderRevalidator) loadDealState(channel *channelData) error {
if !channel.reload {
return nil
}
deal, err := pr.env.Get(channel.dealID)
if err != nil {
return err
}
pr.writeDealState(deal)
channel.reload = false
return nil
}
func (pr *ProviderRevalidator) writeDealState(deal rm.ProviderDealState) {
channel := pr.trackedChannels[*deal.ChannelID]
channel.totalSent = deal.TotalSent
if !deal.PricePerByte.IsZero() {
channel.totalPaidFor = big.Div(big.Max(big.Sub(deal.FundsReceived, deal.UnsealPrice), big.Zero()), deal.PricePerByte).Uint64()
}
channel.interval = deal.CurrentInterval
channel.pricePerByte = deal.PricePerByte
channel.legacyProtocol = deal.LegacyProtocol
}
// Revalidate revalidates a request with a new voucher
func (pr *ProviderRevalidator) Revalidate(channelID datatransfer.ChannelID, voucher datatransfer.Voucher) (datatransfer.VoucherResult, error) {
pr.trackedChannelsLk.RLock()
defer pr.trackedChannelsLk.RUnlock()
channel, ok := pr.trackedChannels[channelID]
if !ok {
return nil, nil
}
// read payment, or fail
payment, ok := voucher.(*rm.DealPayment)
var legacyProtocol bool
if !ok {
legacyPayment, ok := voucher.(*migrations.DealPayment0)
if !ok {
return nil, errors.New("wrong voucher type")
}
newPayment := migrations.MigrateDealPayment0To1(*legacyPayment)
payment = &newPayment
legacyProtocol = true
}
response, err := pr.processPayment(channel.dealID, payment)
if err == nil || err == datatransfer.ErrResume {
channel.reload = true
}
return finalResponse(response, legacyProtocol), err
}
func (pr *ProviderRevalidator) processPayment(dealID rm.ProviderDealIdentifier, payment *rm.DealPayment) (*retrievalmarket.DealResponse, error) {
tok, _, err := pr.env.Node().GetChainHead(context.TODO())
if err != nil {
_ = pr.env.SendEvent(dealID, rm.ProviderEventSaveVoucherFailed, err)
return errorDealResponse(dealID, err), err
}
deal, err := pr.env.Get(dealID)
if err != nil {
return errorDealResponse(dealID, err), err
}
// Save voucher
received, err := pr.env.Node().SavePaymentVoucher(context.TODO(), payment.PaymentChannel, payment.PaymentVoucher, nil, big.Zero(), tok)
if err != nil {
_ = pr.env.SendEvent(dealID, rm.ProviderEventSaveVoucherFailed, err)
return errorDealResponse(dealID, err), err
}
totalPaid := big.Add(deal.FundsReceived, received)
// check if all payments are received to continue the deal, or send updated required payment
owed := paymentOwed(deal, totalPaid)
log.Debugf("provider: owed %d: received voucher for %d, total received %d = received so far %d + newly received %d, total sent %d, unseal price %d, price per byte %d",
owed, payment.PaymentVoucher.Amount, totalPaid, deal.FundsReceived, received, deal.TotalSent, deal.UnsealPrice, deal.PricePerByte)
if owed.GreaterThan(big.Zero()) {
log.Debugf("provider: owed %d: sending partial payment request", owed)
_ = pr.env.SendEvent(dealID, rm.ProviderEventPartialPaymentReceived, received)
return &rm.DealResponse{
ID: deal.ID,
Status: deal.Status,
PaymentOwed: owed,
}, datatransfer.ErrPause
}
// resume deal
_ = pr.env.SendEvent(dealID, rm.ProviderEventPaymentReceived, received)
if deal.Status == rm.DealStatusFundsNeededLastPayment {
log.Debugf("provider: funds needed: last payment")
return &rm.DealResponse{
ID: deal.ID,
Status: rm.DealStatusCompleted,
}, datatransfer.ErrResume
}
// We shouldn't resume the data transfer if we haven't finished unsealing/reading the unsealed data into the
// local block-store.
if deal.Status == rm.DealStatusUnsealing || deal.Status == rm.DealStatusFundsNeededUnseal {
return nil, nil
}
return nil, datatransfer.ErrResume
}
func paymentOwed(deal rm.ProviderDealState, totalPaid big.Int) big.Int {
// Check if the payment covers unsealing
if totalPaid.LessThan(deal.UnsealPrice) {
log.Debugf("provider: total paid %d < unseal price %d", totalPaid, deal.UnsealPrice)
return big.Sub(deal.UnsealPrice, totalPaid)
}
// Calculate how much payment has been made for transferred data
transferPayment := big.Sub(totalPaid, deal.UnsealPrice)
// The provider sends data and the client sends payment for the data.
// The provider will send a limited amount of extra data before receiving
// payment. Given the current limit, check if the client has paid enough
// to unlock the next interval.
currentLimitLower := deal.IntervalLowerBound()
log.Debugf("provider: total sent %d bytes, but require payment for interval lower bound %d bytes",
deal.TotalSent, currentLimitLower)
// Calculate the minimum required payment
totalPaymentRequired := big.Mul(big.NewInt(int64(currentLimitLower)), deal.PricePerByte)
// Calculate payment owed
owed := big.Sub(totalPaymentRequired, transferPayment)
log.Debugf("provider: payment owed %d = payment required %d - transfer paid %d",
owed, totalPaymentRequired, transferPayment)
return owed
}
func errorDealResponse(dealID rm.ProviderDealIdentifier, err error) *rm.DealResponse {
return &rm.DealResponse{
ID: dealID.DealID,
Message: err.Error(),
Status: rm.DealStatusErrored,
}
}
// OnPullDataSent is called on the responder side when more bytes are sent
// for a given pull request. It should return a VoucherResult + ErrPause to
// request revalidation or nil to continue uninterrupted,
// other errors will terminate the request
func (pr *ProviderRevalidator) OnPullDataSent(chid datatransfer.ChannelID, additionalBytesSent uint64) (bool, datatransfer.VoucherResult, error) {
pr.trackedChannelsLk.RLock()
defer pr.trackedChannelsLk.RUnlock()
channel, ok := pr.trackedChannels[chid]
if !ok {
return false, nil, nil
}
err := pr.loadDealState(channel)
if err != nil {
return true, nil, err
}
// Calculate how much data has been sent in total
channel.totalSent += additionalBytesSent
if channel.pricePerByte.IsZero() || channel.totalSent < channel.interval {
if !channel.pricePerByte.IsZero() {
log.Debugf("provider: total sent %d < interval %d, sending block", channel.totalSent, channel.interval)
}
return true, nil, pr.env.SendEvent(channel.dealID, rm.ProviderEventBlockSent, channel.totalSent)
}
// Calculate the payment owed
paymentOwed := big.Mul(abi.NewTokenAmount(int64(channel.totalSent-channel.totalPaidFor)), channel.pricePerByte)
log.Debugf("provider: owed %d = (total sent %d - paid for %d) * price per byte %d: sending payment request",
paymentOwed, channel.totalSent, channel.totalPaidFor, channel.pricePerByte)
// Request payment
err = pr.env.SendEvent(channel.dealID, rm.ProviderEventPaymentRequested, channel.totalSent)
if err != nil {
return true, nil, err
}
return true, finalResponse(&rm.DealResponse{
ID: channel.dealID.DealID,
Status: rm.DealStatusFundsNeeded,
PaymentOwed: paymentOwed,
}, channel.legacyProtocol), datatransfer.ErrPause
}
// OnPushDataReceived is called on the responder side when more bytes are received
// for a given push request. It should return a VoucherResult + ErrPause to
// request revalidation or nil to continue uninterrupted,
// other errors will terminate the request
func (pr *ProviderRevalidator) OnPushDataReceived(chid datatransfer.ChannelID, additionalBytesReceived uint64) (bool, datatransfer.VoucherResult, error) {
return false, nil, nil
}
// OnComplete is called to make a final request for revalidation -- often for the
// purpose of settlement.
// if VoucherResult is non nil, the request will enter a settlement phase awaiting
// a final update
func (pr *ProviderRevalidator) OnComplete(chid datatransfer.ChannelID) (bool, datatransfer.VoucherResult, error) {
pr.trackedChannelsLk.RLock()
defer pr.trackedChannelsLk.RUnlock()
channel, ok := pr.trackedChannels[chid]
if !ok {
return false, nil, nil
}
err := pr.loadDealState(channel)
if err != nil {
return true, nil, err
}
err = pr.env.SendEvent(channel.dealID, rm.ProviderEventBlocksCompleted)
if err != nil {
return true, nil, err
}
// Calculate how much payment is owed
paymentOwed := big.Mul(abi.NewTokenAmount(int64(channel.totalSent-channel.totalPaidFor)), channel.pricePerByte)
if paymentOwed.Equals(big.Zero()) {
return true, finalResponse(&rm.DealResponse{
ID: channel.dealID.DealID,
Status: rm.DealStatusCompleted,
}, channel.legacyProtocol), nil
}
// Send a request for payment
log.Debugf("provider: last payment owed %d = (total sent %d - paid for %d) * price per byte %d",
paymentOwed, channel.totalSent, channel.totalPaidFor, channel.pricePerByte)
err = pr.env.SendEvent(channel.dealID, rm.ProviderEventPaymentRequested, channel.totalSent)
if err != nil {
return true, nil, err
}
return true, finalResponse(&rm.DealResponse{
ID: channel.dealID.DealID,
Status: rm.DealStatusFundsNeededLastPayment,
PaymentOwed: paymentOwed,
}, channel.legacyProtocol), datatransfer.ErrPause
}
func finalResponse(response *rm.DealResponse, legacyProtocol bool) datatransfer.Voucher {
if response == nil {
return nil
}
if legacyProtocol {
downgradedResponse := migrations.DealResponse0{
Status: response.Status,
ID: response.ID,
Message: response.Message,
PaymentOwed: response.PaymentOwed,
}
return &downgradedResponse
}
return response
}
type legacyRevalidator struct {
providerRevalidator *ProviderRevalidator
}
func (lrv *legacyRevalidator) Revalidate(channelID datatransfer.ChannelID, voucher datatransfer.Voucher) (datatransfer.VoucherResult, error) {
return lrv.providerRevalidator.Revalidate(channelID, voucher)
}
func (lrv *legacyRevalidator) OnPullDataSent(chid datatransfer.ChannelID, additionalBytesSent uint64) (bool, datatransfer.VoucherResult, error) {
return false, nil, nil
}
func (lrv *legacyRevalidator) OnPushDataReceived(chid datatransfer.ChannelID, additionalBytesReceived uint64) (bool, datatransfer.VoucherResult, error) {
return false, nil, nil
}
func (lrv *legacyRevalidator) OnComplete(chid datatransfer.ChannelID) (bool, datatransfer.VoucherResult, error) {
return false, nil, nil
}
// NewLegacyRevalidator adds a revalidator that will capture revalidation requests for the legacy protocol but
// won't double count data being sent
// TODO: the data transfer revalidator registration needs to be able to take multiple types to avoid double counting
// for data being sent.
func NewLegacyRevalidator(providerRevalidator *ProviderRevalidator) datatransfer.Revalidator {
return &legacyRevalidator{providerRevalidator: providerRevalidator}
}