From ead11ab36a7b7f827b6b8b02b85f06cc3cf7695d Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Fri, 5 Jan 2024 12:34:39 +0100 Subject: [PATCH] lsps2: lnd support --- .github/workflows/integration_tests.yaml | 1 + itest/lnd_breez_client.go | 66 ++++++++-- itest/lspd_test.go | 2 +- lnd/custom_msg_client.go | 149 +++++++++++++++++++++++ lnd/interceptor.go | 5 +- main.go | 51 +++++--- 6 files changed, 243 insertions(+), 31 deletions(-) create mode 100644 lnd/custom_msg_client.go diff --git a/.github/workflows/integration_tests.yaml b/.github/workflows/integration_tests.yaml index e2f82fe0..b136366f 100644 --- a/.github/workflows/integration_tests.yaml +++ b/.github/workflows/integration_tests.yaml @@ -158,6 +158,7 @@ jobs: testLsps2ZeroConfUtxo ] lsp: [ + LND, CLN ] client: [ diff --git a/itest/lnd_breez_client.go b/itest/lnd_breez_client.go index c86be474..f6da55d9 100644 --- a/itest/lnd_breez_client.go +++ b/itest/lnd_breez_client.go @@ -2,8 +2,11 @@ package itest import ( "context" + "encoding/hex" "flag" + "log" "sync" + "time" "github.com/breez/lntest" "github.com/breez/lntest/lnd" @@ -15,11 +18,12 @@ var lndMobileExecutable = flag.String( ) type lndBreezClient struct { - name string - harness *lntest.TestHarness - node *lntest.LndNode - cancel context.CancelFunc - mtx sync.Mutex + name string + harness *lntest.TestHarness + node *lntest.LndNode + customMsgQueue chan *lntest.CustomMsgRequest + cancel context.CancelFunc + mtx sync.Mutex } func newLndBreezClient(h *lntest.TestHarness, m *lntest.Miner, name string) BreezClient { @@ -63,6 +67,8 @@ func (c *lndBreezClient) Start() { ctx, cancel := context.WithCancel(c.harness.Ctx) c.cancel = cancel go c.startChannelAcceptor(ctx) + c.customMsgQueue = make(chan *lntest.CustomMsgRequest, 100) + c.startCustomMsgListener(ctx) } func (c *lndBreezClient) Stop() error { @@ -86,9 +92,51 @@ func (c *lndBreezClient) SetHtlcAcceptor(totalMsat uint64) { // No need for a htlc acceptor in the LND breez client } +func (c *lndBreezClient) startCustomMsgListener(ctx context.Context) { + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + } + + if !c.node.IsStarted() { + log.Printf("%s: cannot listen to custom messages, node is not started.", c.name) + break + } + + listener, err := c.node.LightningClient().SubscribeCustomMessages( + ctx, + &lnd.SubscribeCustomMessagesRequest{}, + ) + if err != nil { + log.Printf("%s: client.SubscribeCustomMessages() error: %v", c.name, err) + break + } + for { + if ctx.Err() != nil { + return + } + msg, err := listener.Recv() + if err != nil { + log.Printf("%s: listener.Recv() error: %v", c.name, err) + break + } + + c.customMsgQueue <- &lntest.CustomMsgRequest{ + PeerId: hex.EncodeToString(msg.Peer), + Type: msg.Type, + Data: msg.Data, + } + } + } + }() +} + func (c *lndBreezClient) ReceiveCustomMessage() *lntest.CustomMsgRequest { - // TODO: Not implemented. - return nil + msg := <-c.customMsgQueue + return msg } func (c *lndBreezClient) startChannelAcceptor(ctx context.Context) error { @@ -98,6 +146,10 @@ func (c *lndBreezClient) startChannelAcceptor(ctx context.Context) error { } for { + if ctx.Err() != nil { + return ctx.Err() + } + request, err := client.Recv() if err != nil { return err diff --git a/itest/lspd_test.go b/itest/lspd_test.go index 87bf0040..9c0cecb6 100644 --- a/itest/lspd_test.go +++ b/itest/lspd_test.go @@ -24,7 +24,7 @@ func TestLspd(t *testing.T) { lndTestCases = append(lndTestCases, c) } } - runTests(t, lndTestCases, "LND-lsp-CLN-client", lndLspFunc, clnClientFunc) + runTests(t, testCases, "LND-lsp-CLN-client", lndLspFunc, clnClientFunc) runTests(t, lndTestCases, "LND-lsp-LND-client", legacyOnionLndLspFunc, lndClientFunc) runTests(t, testCases, "CLN-lsp-CLN-client", clnLspFunc, clnClientFunc) } diff --git a/lnd/custom_msg_client.go b/lnd/custom_msg_client.go new file mode 100644 index 00000000..2f038780 --- /dev/null +++ b/lnd/custom_msg_client.go @@ -0,0 +1,149 @@ +package lnd + +import ( + "context" + "encoding/hex" + "fmt" + "log" + "sync" + "time" + + "github.com/breez/lspd/config" + "github.com/breez/lspd/lightning" + "github.com/lightningnetwork/lnd/lnrpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type CustomMsgClient struct { + lightning.CustomMsgClient + client *LndClient + initWg sync.WaitGroup + stopRequested bool + ctx context.Context + cancel context.CancelFunc + recvQueue chan *lightning.CustomMessage +} + +func NewCustomMsgClient(conf *config.ClnConfig, client *LndClient) *CustomMsgClient { + c := &CustomMsgClient{ + client: client, + recvQueue: make(chan *lightning.CustomMessage, 10000), + } + + c.initWg.Add(1) + return c +} + +func (c *CustomMsgClient) Start() error { + ctx, cancel := context.WithCancel(context.Background()) + c.ctx = ctx + c.cancel = cancel + c.stopRequested = false + return c.listen() +} + +func (i *CustomMsgClient) WaitStarted() { + i.initWg.Wait() +} + +func (i *CustomMsgClient) listen() error { + inited := false + + defer func() { + if !inited { + i.initWg.Done() + } + log.Printf("CLN custom msg listen(): stopping.") + }() + + for { + if i.ctx.Err() != nil { + return i.ctx.Err() + } + + log.Printf("Connecting LND custom msg stream.") + msgClient, err := i.client.client.SubscribeCustomMessages( + i.ctx, + &lnrpc.SubscribeCustomMessagesRequest{}, + ) + if err != nil { + log.Printf("client.SubscribeCustomMessages(): %v", err) + <-time.After(time.Second) + continue + } + + for { + if i.ctx.Err() != nil { + return i.ctx.Err() + } + + if !inited { + inited = true + i.initWg.Done() + } + + // Stop receiving if stop if requested. + if i.stopRequested { + return nil + } + + request, err := msgClient.Recv() + if err != nil { + // If it is just the error result of the context cancellation + // the we exit silently. + status, ok := status.FromError(err) + if ok && status.Code() == codes.Canceled { + log.Printf("Got code canceled. Break.") + break + } + + // Otherwise it an unexpected error, we log. + log.Printf("unexpected error in interceptor.Recv() %v", err) + break + } + + i.recvQueue <- &lightning.CustomMessage{ + PeerId: hex.EncodeToString(request.Peer), + Type: request.Type, + Data: request.Data, + } + } + + <-time.After(time.Second) + } +} + +func (c *CustomMsgClient) Recv() (*lightning.CustomMessage, error) { + select { + case msg := <-c.recvQueue: + return msg, nil + case <-c.ctx.Done(): + return nil, c.ctx.Err() + } +} + +func (c *CustomMsgClient) Send(msg *lightning.CustomMessage) error { + peerId, err := hex.DecodeString(msg.PeerId) + if err != nil { + return fmt.Errorf("hex.DecodeString(%s) err: %w", msg.PeerId, err) + } + _, err = c.client.client.SendCustomMessage( + c.ctx, + &lnrpc.SendCustomMessageRequest{ + Peer: peerId, + Type: msg.Type, + Data: msg.Data, + }, + ) + return err +} + +func (i *CustomMsgClient) Stop() error { + // Setting stopRequested to true will make the interceptor stop receiving. + i.stopRequested = true + + // Close the grpc connection. + i.cancel() + return nil +} diff --git a/lnd/interceptor.go b/lnd/interceptor.go index fae2083f..d689f00c 100644 --- a/lnd/interceptor.go +++ b/lnd/interceptor.go @@ -9,7 +9,6 @@ import ( "github.com/breez/lspd/common" "github.com/breez/lspd/config" - "github.com/breez/lspd/interceptor" "github.com/breez/lspd/lightning" "github.com/btcsuite/btcd/btcec/v2" sphinx "github.com/lightningnetwork/lightning-onion" @@ -24,7 +23,7 @@ import ( type LndHtlcInterceptor struct { fwsync *ForwardingHistorySync - interceptor *interceptor.Interceptor + interceptor common.InterceptHandler config *config.NodeConfig client *LndClient stopRequested bool @@ -38,7 +37,7 @@ func NewLndHtlcInterceptor( conf *config.NodeConfig, client *LndClient, fwsync *ForwardingHistorySync, - interceptor *interceptor.Interceptor, + interceptor common.InterceptHandler, ) (*LndHtlcInterceptor, error) { i := &LndHtlcInterceptor{ config: conf, diff --git a/main.go b/main.go index 15f40b50..519a85df 100644 --- a/main.go +++ b/main.go @@ -111,6 +111,25 @@ func main() { var interceptors []interceptor.HtlcInterceptor for _, node := range nodes { var htlcInterceptor interceptor.HtlcInterceptor + lsps2Config := &lsps2.InterceptorConfig{ + NodeId: node.NodeId, + ChainHash: node.ChainHash, + AdditionalChannelCapacitySat: uint64(node.NodeConfig.AdditionalChannelCapacity), + MinConfs: node.NodeConfig.MinConfs, + TargetConf: node.NodeConfig.TargetConf, + FeeStrategy: feeStrategy, + MinPaymentSizeMsat: node.NodeConfig.MinPaymentSizeMsat, + MaxPaymentSizeMsat: node.NodeConfig.MaxPaymentSizeMsat, + TimeLockDelta: node.NodeConfig.TimeLockDelta, + HtlcMinimumMsat: node.NodeConfig.MinHtlcMsat, + MppTimeout: time.Second * 90, + } + msgServer := lsps0.NewServer() + protocolServer := lsps0.NewProtocolServer([]uint32{2}) + lsps2Server := lsps2.NewLsps2Server(openingService, nodesService, node, lsps2Store) + lsps0.RegisterProtocolServer(msgServer, protocolServer) + lsps2.RegisterLsps2Server(msgServer, lsps2Server) + if node.NodeConfig.Lnd != nil { client, err := lnd.NewLndClient(node.NodeConfig.Lnd) if err != nil { @@ -119,11 +138,20 @@ func main() { client.StartListeners() fwsync := lnd.NewForwardingHistorySync(client, interceptStore, forwardingStore) - interceptor := interceptor.NewInterceptHandler(client, node, interceptStore, openingService, feeEstimator, feeStrategy, notificationService) - htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node.NodeConfig, client, fwsync, interceptor) + legacyHandler := interceptor.NewInterceptHandler(client, node, interceptStore, openingService, feeEstimator, feeStrategy, notificationService) + lsps2Handler := lsps2.NewInterceptHandler(lsps2Store, openingService, client, feeEstimator, lsps2Config) + go lsps2Handler.Start(ctx) + combinedHandler := common.NewCombinedHandler(lsps2Handler, legacyHandler) + htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node.NodeConfig, client, fwsync, combinedHandler) if err != nil { log.Fatalf("failed to initialize LND interceptor: %v", err) } + + msgClient := lnd.NewCustomMsgClient(node.NodeConfig.Cln, client) + go msgClient.Start() + msgClient.WaitStarted() + defer msgClient.Stop() + go msgServer.Serve(msgClient) } if node.NodeConfig.Cln != nil { @@ -133,19 +161,7 @@ func main() { } legacyHandler := interceptor.NewInterceptHandler(client, node, interceptStore, openingService, feeEstimator, feeStrategy, notificationService) - lsps2Handler := lsps2.NewInterceptHandler(lsps2Store, openingService, client, feeEstimator, &lsps2.InterceptorConfig{ - NodeId: node.NodeId, - ChainHash: node.ChainHash, - AdditionalChannelCapacitySat: uint64(node.NodeConfig.AdditionalChannelCapacity), - MinConfs: node.NodeConfig.MinConfs, - TargetConf: node.NodeConfig.TargetConf, - FeeStrategy: feeStrategy, - MinPaymentSizeMsat: node.NodeConfig.MinPaymentSizeMsat, - MaxPaymentSizeMsat: node.NodeConfig.MaxPaymentSizeMsat, - TimeLockDelta: node.NodeConfig.TimeLockDelta, - HtlcMinimumMsat: node.NodeConfig.MinHtlcMsat, - MppTimeout: time.Second * 90, - }) + lsps2Handler := lsps2.NewInterceptHandler(lsps2Store, openingService, client, feeEstimator, lsps2Config) go lsps2Handler.Start(ctx) combinedHandler := common.NewCombinedHandler(lsps2Handler, legacyHandler) htlcInterceptor, err = cln.NewClnHtlcInterceptor(node.NodeConfig, client, combinedHandler) @@ -155,11 +171,6 @@ func main() { msgClient := cln.NewCustomMsgClient(node.NodeConfig.Cln, client) go msgClient.Start() - msgServer := lsps0.NewServer() - protocolServer := lsps0.NewProtocolServer([]uint32{2}) - lsps2Server := lsps2.NewLsps2Server(openingService, nodesService, node, lsps2Store) - lsps0.RegisterProtocolServer(msgServer, protocolServer) - lsps2.RegisterLsps2Server(msgServer, lsps2Server) msgClient.WaitStarted() defer msgClient.Stop() go msgServer.Serve(msgClient)