Skip to content

Commit

Permalink
lsps2: lnd support
Browse files Browse the repository at this point in the history
  • Loading branch information
JssDWt committed Jan 5, 2024
1 parent 43b2c52 commit e669d4e
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 31 deletions.
1 change: 1 addition & 0 deletions .github/workflows/integration_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ jobs:
testLsps2ZeroConfUtxo
]
lsp: [
LND,
CLN
]
client: [
Expand Down
66 changes: 59 additions & 7 deletions itest/lnd_breez_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package itest

import (
"context"
"encoding/hex"
"flag"
"log"
"sync"
"time"

"github.com/breez/lntest"
"github.com/breez/lntest/lnd"
Expand All @@ -15,11 +18,12 @@ var lndMobileExecutable = flag.String(
)

type lndBreezClient struct {
name string
harness *lntest.TestHarness
node *lntest.LndNode
cancel context.CancelFunc
mtx sync.Mutex
name string
harness *lntest.TestHarness
node *lntest.LndNode
customMsgQueue chan *lntest.CustomMsgRequest
cancel context.CancelFunc
mtx sync.Mutex
}

func newLndBreezClient(h *lntest.TestHarness, m *lntest.Miner, name string) BreezClient {
Expand Down Expand Up @@ -63,6 +67,8 @@ func (c *lndBreezClient) Start() {
ctx, cancel := context.WithCancel(c.harness.Ctx)
c.cancel = cancel
go c.startChannelAcceptor(ctx)
c.customMsgQueue = make(chan *lntest.CustomMsgRequest, 100)
c.startCustomMsgListener(ctx)
}

func (c *lndBreezClient) Stop() error {
Expand All @@ -86,9 +92,51 @@ func (c *lndBreezClient) SetHtlcAcceptor(totalMsat uint64) {
// No need for a htlc acceptor in the LND breez client
}

func (c *lndBreezClient) startCustomMsgListener(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
}

if !c.node.IsStarted() {
log.Printf("%s: cannot listen to custom messages, node is not started.", c.name)
break
}

listener, err := c.node.LightningClient().SubscribeCustomMessages(
ctx,
&lnd.SubscribeCustomMessagesRequest{},
)
if err != nil {
log.Printf("%s: client.SubscribeCustomMessages() error: %v", c.name, err)
break
}
for {
if ctx.Err() != nil {
return
}
msg, err := listener.Recv()
if err != nil {
log.Printf("%s: listener.Recv() error: %v", c.name, err)
break
}

c.customMsgQueue <- &lntest.CustomMsgRequest{
PeerId: hex.EncodeToString(msg.Peer),
Type: msg.Type,
Data: msg.Data,
}
}
}
}()
}

func (c *lndBreezClient) ReceiveCustomMessage() *lntest.CustomMsgRequest {
// TODO: Not implemented.
return nil
msg := <-c.customMsgQueue
return msg
}

func (c *lndBreezClient) startChannelAcceptor(ctx context.Context) error {
Expand All @@ -98,6 +146,10 @@ func (c *lndBreezClient) startChannelAcceptor(ctx context.Context) error {
}

for {
if ctx.Err() != nil {
return ctx.Err()
}

request, err := client.Recv()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion itest/lspd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestLspd(t *testing.T) {
lndTestCases = append(lndTestCases, c)
}
}
runTests(t, lndTestCases, "LND-lsp-CLN-client", lndLspFunc, clnClientFunc)
runTests(t, testCases, "LND-lsp-CLN-client", lndLspFunc, clnClientFunc)
runTests(t, lndTestCases, "LND-lsp-LND-client", legacyOnionLndLspFunc, lndClientFunc)
runTests(t, testCases, "CLN-lsp-CLN-client", clnLspFunc, clnClientFunc)
}
Expand Down
149 changes: 149 additions & 0 deletions lnd/custom_msg_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package lnd

import (
"context"
"encoding/hex"
"fmt"
"log"
"sync"
"time"

"github.com/breez/lspd/config"
"github.com/breez/lspd/lightning"
"github.com/lightningnetwork/lnd/lnrpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type CustomMsgClient struct {
lightning.CustomMsgClient
client *LndClient
initWg sync.WaitGroup
stopRequested bool
ctx context.Context
cancel context.CancelFunc
recvQueue chan *lightning.CustomMessage
}

func NewCustomMsgClient(conf *config.ClnConfig, client *LndClient) *CustomMsgClient {
c := &CustomMsgClient{
client: client,
recvQueue: make(chan *lightning.CustomMessage, 10000),
}

c.initWg.Add(1)
return c
}

func (c *CustomMsgClient) Start() error {
ctx, cancel := context.WithCancel(context.Background())
c.ctx = ctx
c.cancel = cancel
c.stopRequested = false
return c.listen()
}

func (i *CustomMsgClient) WaitStarted() {
i.initWg.Wait()
}

func (i *CustomMsgClient) listen() error {
inited := false

defer func() {
if !inited {
i.initWg.Done()
}
log.Printf("CLN custom msg listen(): stopping.")
}()

for {
if i.ctx.Err() != nil {
return i.ctx.Err()
}

log.Printf("Connecting LND custom msg stream.")
msgClient, err := i.client.client.SubscribeCustomMessages(
i.ctx,
&lnrpc.SubscribeCustomMessagesRequest{},
)
if err != nil {
log.Printf("client.SubscribeCustomMessages(): %v", err)
<-time.After(time.Second)
continue
}

for {
if i.ctx.Err() != nil {
return i.ctx.Err()
}

if !inited {
inited = true
i.initWg.Done()
}

// Stop receiving if stop if requested.
if i.stopRequested {
return nil
}

request, err := msgClient.Recv()
if err != nil {
// If it is just the error result of the context cancellation
// the we exit silently.
status, ok := status.FromError(err)
if ok && status.Code() == codes.Canceled {
log.Printf("Got code canceled. Break.")
break
}

// Otherwise it an unexpected error, we log.
log.Printf("unexpected error in interceptor.Recv() %v", err)
break
}

i.recvQueue <- &lightning.CustomMessage{
PeerId: hex.EncodeToString(request.Peer),
Type: request.Type,
Data: request.Data,
}
}

<-time.After(time.Second)
}
}

func (c *CustomMsgClient) Recv() (*lightning.CustomMessage, error) {
select {
case msg := <-c.recvQueue:
return msg, nil
case <-c.ctx.Done():
return nil, c.ctx.Err()
}
}

func (c *CustomMsgClient) Send(msg *lightning.CustomMessage) error {
peerId, err := hex.DecodeString(msg.PeerId)
if err != nil {
return fmt.Errorf("hex.DecodeString(%s) err: %w", msg.PeerId, err)
}
_, err = c.client.client.SendCustomMessage(
c.ctx,
&lnrpc.SendCustomMessageRequest{
Peer: peerId,
Type: msg.Type,
Data: msg.Data,
},
)
return err
}

func (i *CustomMsgClient) Stop() error {
// Setting stopRequested to true will make the interceptor stop receiving.
i.stopRequested = true

// Close the grpc connection.
i.cancel()
return nil
}
5 changes: 2 additions & 3 deletions lnd/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/breez/lspd/common"
"github.com/breez/lspd/config"
"github.com/breez/lspd/interceptor"
"github.com/breez/lspd/lightning"
"github.com/btcsuite/btcd/btcec/v2"
sphinx "github.com/lightningnetwork/lightning-onion"
Expand All @@ -24,7 +23,7 @@ import (

type LndHtlcInterceptor struct {
fwsync *ForwardingHistorySync
interceptor *interceptor.Interceptor
interceptor common.InterceptHandler
config *config.NodeConfig
client *LndClient
stopRequested bool
Expand All @@ -38,7 +37,7 @@ func NewLndHtlcInterceptor(
conf *config.NodeConfig,
client *LndClient,
fwsync *ForwardingHistorySync,
interceptor *interceptor.Interceptor,
interceptor common.InterceptHandler,
) (*LndHtlcInterceptor, error) {
i := &LndHtlcInterceptor{
config: conf,
Expand Down
51 changes: 31 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,25 @@ func main() {
var interceptors []interceptor.HtlcInterceptor
for _, node := range nodes {
var htlcInterceptor interceptor.HtlcInterceptor
lsps2Config := &lsps2.InterceptorConfig{
NodeId: node.NodeId,
ChainHash: node.ChainHash,
AdditionalChannelCapacitySat: uint64(node.NodeConfig.AdditionalChannelCapacity),
MinConfs: node.NodeConfig.MinConfs,
TargetConf: node.NodeConfig.TargetConf,
FeeStrategy: feeStrategy,
MinPaymentSizeMsat: node.NodeConfig.MinPaymentSizeMsat,
MaxPaymentSizeMsat: node.NodeConfig.MaxPaymentSizeMsat,
TimeLockDelta: node.NodeConfig.TimeLockDelta,
HtlcMinimumMsat: node.NodeConfig.MinHtlcMsat,
MppTimeout: time.Second * 90,
}
msgServer := lsps0.NewServer()
protocolServer := lsps0.NewProtocolServer([]uint32{2})
lsps2Server := lsps2.NewLsps2Server(openingService, nodesService, node, lsps2Store)
lsps0.RegisterProtocolServer(msgServer, protocolServer)
lsps2.RegisterLsps2Server(msgServer, lsps2Server)

if node.NodeConfig.Lnd != nil {
client, err := lnd.NewLndClient(node.NodeConfig.Lnd)
if err != nil {
Expand All @@ -119,11 +138,20 @@ func main() {

client.StartListeners()
fwsync := lnd.NewForwardingHistorySync(client, interceptStore, forwardingStore)
interceptor := interceptor.NewInterceptHandler(client, node, interceptStore, openingService, feeEstimator, feeStrategy, notificationService)
htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node.NodeConfig, client, fwsync, interceptor)
legacyHandler := interceptor.NewInterceptHandler(client, node, interceptStore, openingService, feeEstimator, feeStrategy, notificationService)
lsps2Handler := lsps2.NewInterceptHandler(lsps2Store, openingService, client, feeEstimator, lsps2Config)
go lsps2Handler.Start(ctx)
combinedHandler := common.NewCombinedHandler(lsps2Handler, legacyHandler)
htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node.NodeConfig, client, fwsync, combinedHandler)
if err != nil {
log.Fatalf("failed to initialize LND interceptor: %v", err)
}

msgClient := lnd.NewCustomMsgClient(node.NodeConfig.Cln, client)
go msgClient.Start()
msgClient.WaitStarted()
defer msgClient.Stop()
go msgServer.Serve(msgClient)
}

if node.NodeConfig.Cln != nil {
Expand All @@ -133,19 +161,7 @@ func main() {
}

legacyHandler := interceptor.NewInterceptHandler(client, node, interceptStore, openingService, feeEstimator, feeStrategy, notificationService)
lsps2Handler := lsps2.NewInterceptHandler(lsps2Store, openingService, client, feeEstimator, &lsps2.InterceptorConfig{
NodeId: node.NodeId,
ChainHash: node.ChainHash,
AdditionalChannelCapacitySat: uint64(node.NodeConfig.AdditionalChannelCapacity),
MinConfs: node.NodeConfig.MinConfs,
TargetConf: node.NodeConfig.TargetConf,
FeeStrategy: feeStrategy,
MinPaymentSizeMsat: node.NodeConfig.MinPaymentSizeMsat,
MaxPaymentSizeMsat: node.NodeConfig.MaxPaymentSizeMsat,
TimeLockDelta: node.NodeConfig.TimeLockDelta,
HtlcMinimumMsat: node.NodeConfig.MinHtlcMsat,
MppTimeout: time.Second * 90,
})
lsps2Handler := lsps2.NewInterceptHandler(lsps2Store, openingService, client, feeEstimator, lsps2Config)
go lsps2Handler.Start(ctx)
combinedHandler := common.NewCombinedHandler(lsps2Handler, legacyHandler)
htlcInterceptor, err = cln.NewClnHtlcInterceptor(node.NodeConfig, client, combinedHandler)
Expand All @@ -155,11 +171,6 @@ func main() {

msgClient := cln.NewCustomMsgClient(node.NodeConfig.Cln, client)
go msgClient.Start()
msgServer := lsps0.NewServer()
protocolServer := lsps0.NewProtocolServer([]uint32{2})
lsps2Server := lsps2.NewLsps2Server(openingService, nodesService, node, lsps2Store)
lsps0.RegisterProtocolServer(msgServer, protocolServer)
lsps2.RegisterLsps2Server(msgServer, lsps2Server)
msgClient.WaitStarted()
defer msgClient.Stop()
go msgServer.Serve(msgClient)
Expand Down

0 comments on commit e669d4e

Please sign in to comment.