-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lsps2 forwarding #125
Lsps2 forwarding #125
Conversation
140738f
to
8f0e251
Compare
4179176
to
894c80c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the first batch of commits. This looks awesome! My main points are arround simplifying vs performance tradeoffs.
Will continue to review in batches.
cln_plugin/server.go
Outdated
@@ -164,6 +167,11 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error { | |||
|
|||
s.htlcStream = stream | |||
|
|||
// Replay in-flight htlcs in fifo order | |||
for pair := s.inflightHtlcs.Oldest(); pair != nil; pair = pair.Next() { | |||
sendHtlcAccepted(stream, pair.Value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems here that we ignore errors. The main concern is that we will hold this htlc forever (causing the channel to be closed) in case of error as there is no retry logic like in the ongoing htlc accepted hook.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout listener is still running. So if this htlc is not handled, it will time out.
The ongoing htlc accepted hook waits for a new subscriber to send the message to.
This replay happens at the moment when lspd subscribes to the stream. So if the connection breaks, lspd will subscribe again and this part will be run again. My assumption was an error means the stream is broken.
I do agree on error we should break this loop. Also I noticed that the retry mechanism in combination with this replay will send the same htlc to lspd twice on reconnect. I'll fix that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we're not tracking the timeout anymore when the htlc is already sent. Good catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be fixed now in the latest commit: efec935
return true | ||
} | ||
|
||
if time.Now().UTC().After(t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Do we need the UTC conversion here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ValidUntil
is utc, so in order to check for expiry, this needs UTC as well. My expiry tests failed before, because this wasn't utc.
lsps2/intercept_handler.go
Outdated
|
||
// Fetch the buy registration in a goroutine, to avoid blocking the | ||
// event loop. | ||
go i.fetchRegistration(part.req.PaymentId(), part.req.Scid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tradeoff of using a go routine here is make the workflow more complicated. I think if we don't use a go routine here then we won't need these three channels: awaitingRegistration
registrationReady
notRegistered
which will make the flow more readable and in context.
The disadvantage of having a database query (very fast one) looks negligible to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did code it that way at first. The change to put the database query in a goroutine indeed added those chans you mention. The problem is, if the event loop is clogged, that's a fatal issue that's very hard to debug. This event loop processes all htlcs on the node, not just the ones meant for channel creation. If new htlcs keep being added, and the database query does turn out to be slow for some reason, that causes the loop to keep adding new htlcs. And potentially not handle resolutions for channel opening, or process them very slowly. Which can lead to timeouts on the sender (weird behavior), cause us to open unused channels (costly), and in the case of an attack may lead to loss of funds due to underlying htlc timeouts (maybe cln has a mechanism to prevent this, not sure).
So even though it's complicated, I think it's warranted for the adversarial scenario tbh.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I will take a look again to see if we can simplify without risking the regular flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand now the logic behind the decision to choose the go routine for the db query but practically I don't think we had such issue with the db and the simplicity in one channel seems like a great advantage but we can still sleep on it before doing any change. Maybe @yaslama can share some insights.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We worked with locks before, based on the payment hash, which means a lot of work can be done in parallel. A slow db operation doesn't affect the general flow then. I added a test here
Lines 707 to 760 in c6a5798
func Test_Mpp_Performance(t *testing.T) { | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
paymentCount := 100 | |
partCount := 10 | |
store := &mockLsps2Store{ | |
delay: time.Millisecond * 500, | |
registrations: make(map[uint64]*BuyRegistration), | |
} | |
client := &mockLightningClient{} | |
for paymentNo := 0; paymentNo < paymentCount; paymentNo++ { | |
scid := uint64(paymentNo + 1_000_000) | |
client.getChanResponses = append(client.getChanResponses, defaultChanResult) | |
client.openResponses = append(client.openResponses, defaultOutPoint) | |
store.registrations[scid] = &BuyRegistration{ | |
PeerId: strconv.FormatUint(scid, 10), | |
Scid: lightning.ShortChannelID(scid), | |
Mode: OpeningMode_MppFixedInvoice, | |
OpeningFeeParams: defaultOpeningFeeParams(), | |
PaymentSizeMsat: &defaultPaymentSizeMsat, | |
} | |
} | |
i := setupInterceptor(ctx, &interceptP{store: store, client: client}) | |
var wg sync.WaitGroup | |
wg.Add(partCount * paymentCount) | |
start := time.Now() | |
for paymentNo := 0; paymentNo < paymentCount; paymentNo++ { | |
for partNo := 0; partNo < partCount; partNo++ { | |
scid := paymentNo + 1_000_000 | |
id := fmt.Sprintf("%d|%d", paymentNo, partNo) | |
var a [8]byte | |
binary.BigEndian.PutUint64(a[:], uint64(scid)) | |
ph := sha256.Sum256(a[:]) | |
go func() { | |
res := i.Intercept(createPart(&part{ | |
scid: uint64(scid), | |
id: id, | |
ph: ph[:], | |
amt: defaultPaymentSizeMsat / uint64(partCount), | |
})) | |
assert.Equal(t, shared.INTERCEPT_RESUME_WITH_ONION, res.Action) | |
wg.Done() | |
}() | |
} | |
} | |
wg.Wait() | |
end := time.Now() | |
assert.LessOrEqual(t, end.Sub(start).Milliseconds(), int64(1000)) | |
assertEmpty(t, i) | |
} |
consisting of 100 payments of 10 parts. Db delay is set to 500ms. That takes around 50 seconds to complete in total without the goroutine. And around 500ms to complete with the goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be the same for 100 payments of 1 part. That would also take 50 seconds. with a 500ms delay for fetching the fetchRegistration
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty sure that this indexed query will be executed in no more than 5 millisecond. Most probably even that no io is needed at the db level because it will hit the db in memory cache.
I am not arguing about the performance which I agree your approach is faster, only about the tradeoffs
Did you consider still using go routines for parallel ececution but one go routine for the whole processing of the htlc? So effectively use one channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's that you have to wait for other htlcs to be processed in order to continue the current htlc that made me come up with this design. It won't really become less complicated if every htlc is processed in its own goroutine. Now the complexity is now in htlc processing stages on the event loop. If you process them in parallel, the complexity will be in handling locks and races. What I like about this event loop is you don't have to worry about races, because every stage is handled one by one. Everywhere where you need synchronization, like updating the payment state or part state, or finalizing the htlcs for example, you can be certain there is no other thread modifying the same values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW you do mention 'indexed query'. I agree things should be fast if everything is indexed correctly. But if there's network maintenance or something, or a query is not indexed properly, or there's even some read lock on the db somewhere that runs for a while, it's problematic in this design. We could try another design, with a single goroutine per htlc, and using locks and waitgroups. That would take away that weird handling of the htlc in stages. I'm not sure whether it would be less complicated though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point. Keep in mind that network maintenance or db not responding in the current design will keep spinning new go routings that hang.
Let's discuss tomorrow. I think we have all we need to agree on the approach!
87d3808
to
c6a5798
Compare
Added two commits:
|
// Update the new part to processed, if the replaced part was processed | ||
// already. | ||
part.isProcessed = existingPart.isProcessed | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we end up here do we need the fetchRegistration to be executed? Currently it seems it is executed even the part has already been handled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only the first part that arrives for a given payment fetches the registration.
The first part adds the payment to inflightPayments
and runs fetchRegistration
in the goroutine.
For later parts (also replacements) the db query is not executed, we only wait for the registration to be set on the payment.
This part is a replacement, which means there is already some work to be done for that part on a queue somewhere. That work continues where it is at that point, so there's no need to put the work on a queue again.
lsps2/intercept_handler.go
Outdated
|
||
// Fetch the buy registration in a goroutine, to avoid blocking the | ||
// event loop. | ||
go i.fetchRegistration(part.req.PaymentId(), part.req.Scid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand now the logic behind the decision to choose the go routine for the db query but practically I don't think we had such issue with the db and the simplicity in one channel seems like a great advantage but we can still sleep on it before doing any change. Maybe @yaslama can share some insights.
a24f5ff
to
a58c990
Compare
Updated the PR to have a much more simplified version of the event loop. |
@roeierez I don't think a structure with locks is viable. It's much more complicated. |
@@ -15,3 +15,18 @@ CREATE TABLE lsps2.buy_registrations ( | |||
); | |||
CREATE UNIQUE INDEX idx_lsps2_buy_registrations_scid ON lsps2.buy_registrations (scid); | |||
CREATE INDEX idx_lsps2_buy_registrations_valid_until ON lsps2.buy_registrations (params_valid_until); | |||
|
|||
CREATE TABLE lsps2.bought_channels ( | |||
id bigserial PRIMARY KEY, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's perhaps better to use uuid7 instead (there is a pg extension to generate them but it's better imo to use a go generator like https://github.com/GoWebProd/uuid7)
|
||
CREATE TABLE lsps2.bought_channels ( | ||
id bigserial PRIMARY KEY, | ||
registration_id bigint NOT NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here also uuid7 is perhaps better
@@ -3,26 +3,13 @@ package interceptor | |||
import ( | |||
"time" | |||
|
|||
"github.com/breez/lspd/shared" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that it's a nit about naming and it's already in the branch, but I don't think that the name common or lncommon convey more the meaning than "shared"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll rename that in a separate PR
Adds the htlc forwarding component for the LSPS2 implementation.
The forwarding logic is run in an event loop to avoid extensive locking mutexes.
This is a rather large PR, please review carefully. It will take time to review. Any nits will do.
The main part that is added is lsps2/intercept_handler.go, which is the interceptor for lsps2.
The old interceptor now has the same signature as the new one. And the magic of combining them is in shared/combined_handler.go
Some parts of this PR might make it hard to review.
If those changes make it too complicated to review, I can put them in separate PRs.
TODO