Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NetworkDb: remove stale channels in batch #886

Merged
merged 10 commits into from
Mar 12, 2019
11 changes: 6 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/NetworkDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ trait NetworkDb {

def addChannel(c: ChannelAnnouncement, txid: BinaryData, capacity: Satoshi)

def removeChannel(shortChannelId: ShortChannelId) = removeChannels(Seq(shortChannelId))

/**
* This method removes 1 channel announcement and 2 channel updates (at both ends of the same channel)
* This method removes channel announcements and associated channel updates for a list of channel ids
*
* @param shortChannelId
* @return
* @param shortChannelIds list of short channel ids
*/
def removeChannel(shortChannelId: ShortChannelId)
def removeChannels(shortChannelIds: Iterable[ShortChannelId])

def listChannels(): Map[ChannelAnnouncement, (BinaryData, Satoshi)]

Expand All @@ -49,7 +50,7 @@ trait NetworkDb {

def listChannelUpdates(): Seq[ChannelUpdate]

def addToPruned(shortChannelId: ShortChannelId)
def addToPruned(shortChannelIds: Iterable[ShortChannelId]): Unit

def removeFromPruned(shortChannelId: ShortChannelId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,20 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb {
}
}

override def removeChannel(shortChannelId: ShortChannelId): Unit = {
using(sqlite.createStatement) { statement =>
statement.execute("BEGIN TRANSACTION")
statement.executeUpdate(s"DELETE FROM channel_updates WHERE short_channel_id=${shortChannelId.toLong}")
statement.executeUpdate(s"DELETE FROM channels WHERE short_channel_id=${shortChannelId.toLong}")
statement.execute("COMMIT TRANSACTION")
override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = {

def removeChannelsInternal(shortChannelIds: Iterable[ShortChannelId]): Unit = {
val ids = shortChannelIds.map(_.toLong).mkString(",")
using(sqlite.createStatement) { statement =>
statement.execute("BEGIN TRANSACTION")
statement.executeUpdate(s"DELETE FROM channel_updates WHERE short_channel_id IN ($ids)")
statement.executeUpdate(s"DELETE FROM channels WHERE short_channel_id IN ($ids)")
statement.execute("COMMIT TRANSACTION")
}
}

// remove channels by batch of 1000
shortChannelIds.grouped(1000).foreach(removeChannelsInternal)
}

override def listChannels(): Map[ChannelAnnouncement, (BinaryData, Satoshi)] = {
Expand Down Expand Up @@ -129,10 +136,13 @@ class SqliteNetworkDb(sqlite: Connection) extends NetworkDb {
}
}

override def addToPruned(shortChannelId: ShortChannelId): Unit = {
using(sqlite.prepareStatement("INSERT OR IGNORE INTO pruned VALUES (?)")) { statement =>
statement.setLong(1, shortChannelId.toLong)
statement.executeUpdate()
override def addToPruned(shortChannelIds: Iterable[ShortChannelId]): Unit = {
using(sqlite.prepareStatement("INSERT OR IGNORE INTO pruned VALUES (?)"), disableAutoCommit = true) { statement =>
shortChannelIds.foreach(shortChannelId => {
statement.setLong(1, shortChannelId.toLong)
statement.addBatch()
})
statement.executeBatch()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,11 @@ class Router(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Prom
val staleNodes = potentialStaleNodes.filterNot(nodeId => hasChannels(nodeId, channels1.values))

// let's clean the db and send the events
db.removeChannels(staleChannels) // NB: this also removes channel updates
// we keep track of recently pruned channels so we don't revalidate them (zombie churn)
db.addToPruned(staleChannels)
staleChannels.foreach { shortChannelId =>
log.info("pruning shortChannelId={} (stale)", shortChannelId)
db.removeChannel(shortChannelId) // NB: this also removes channel updates
// we keep track of recently pruned channels so we don't revalidate them (zombie churn)
db.addToPruned(shortChannelId)
context.system.eventStream.publish(ChannelLost(shortChannelId))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.net.{InetAddress, InetSocketAddress}
import java.sql.DriverManager

import com.google.common.net.HostAndPort
import fr.acinq.bitcoin.{Block, Crypto, Satoshi}
import fr.acinq.bitcoin.{BinaryData, Block, Crypto, Satoshi}
import fr.acinq.eclair.db.sqlite.SqliteNetworkDb
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.wire.{Color, NodeAddress, Tor2}
Expand All @@ -33,6 +33,8 @@ class SqliteNetworkDbSpec extends FunSuite {

def inmem = DriverManager.getConnection("jdbc:sqlite::memory:")

val shortChannelIds = (42 to (5000 + 42)).map(i => ShortChannelId(i))

test("init sqlite 2 times in a row") {
val sqlite = inmem
val db1 = new SqliteNetworkDb(sqlite)
Expand Down Expand Up @@ -104,19 +106,36 @@ class SqliteNetworkDbSpec extends FunSuite {
db.updateChannelUpdate(channel_update_1)
}

test("add/remove/test pruned channels") {
test("remove many channels") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this test so much more complicated?

val sqlite = inmem
val db = new SqliteNetworkDb(sqlite)
val sig = Crypto.encodeSignature(Crypto.sign(randomKey.toBin, randomKey)) :+ 1.toByte
val priv = randomKey
val pub = priv.publicKey
val capacity = Satoshi(10000)

db.addToPruned(ShortChannelId(1))
db.addToPruned(ShortChannelId(5))
val channels = shortChannelIds.map(id => Announcements.makeChannelAnnouncement(Block.RegtestGenesisBlock.hash, id, pub, pub, pub, pub, sig, sig, sig, sig))
val template = Announcements.makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv, pub, ShortChannelId(42), 5, 7000000, 50000, 100, 500000000L, true)
val updates = shortChannelIds.map(id => template.copy(shortChannelId = id))
val txid = BinaryData("ab" * 32)
channels.foreach(ca => db.addChannel(ca, txid, capacity))
updates.foreach(u => db.addChannelUpdate(u))
assert(db.listChannels().keySet === channels.toSet)
assert(db.listChannelUpdates() === updates)

val toDelete = channels.map(_.shortChannelId).drop(500).take(2500)
db.removeChannels(toDelete)
assert(db.listChannels().keySet === channels.filterNot(a => toDelete.contains(a.shortChannelId)).toSet)
assert(db.listChannelUpdates().toSet === updates.filterNot(u => toDelete.contains(u.shortChannelId)).toSet)
}

assert(db.isPruned(ShortChannelId(1)))
assert(!db.isPruned(ShortChannelId(3)))
assert(db.isPruned(ShortChannelId(1)))
test("prune many channels") {
val sqlite = inmem
val db = new SqliteNetworkDb(sqlite)

db.addToPruned(shortChannelIds)
shortChannelIds.foreach { id => assert(db.isPruned((id))) }
db.removeFromPruned(ShortChannelId(5))
assert(!db.isPruned(ShortChannelId(5)))
}

}