From fb306212190247820d0b99a6716fcffc0f9966ea Mon Sep 17 00:00:00 2001 From: Gabe Conradi Date: Fri, 10 Mar 2017 15:12:22 -0500 Subject: [PATCH 1/8] remove explicit commit when indexing docs in solr, use softAutoCommit instead --- app/collins/solr/SolrConfig.scala | 1 + app/collins/solr/SolrHelper.scala | 29 +++++++++++++------- app/collins/solr/SolrUpdater.scala | 4 ++- conf/solr/cores/collins/conf/solrconfig.xml | 30 ++++++++++++++------- 4 files changed, 44 insertions(+), 20 deletions(-) diff --git a/app/collins/solr/SolrConfig.scala b/app/collins/solr/SolrConfig.scala index 4fd483178..bbaf84651 100644 --- a/app/collins/solr/SolrConfig.scala +++ b/app/collins/solr/SolrConfig.scala @@ -32,6 +32,7 @@ object SolrConfig extends Configurable { def socketTimeout = getInt("socketTimeout", 1000) def connectionTimeout = getInt("connectionTimeout", 5000) def maxTotalConnections = getInt("maxTotalConnections", 100) + def commitWithin = getInt("commitWithinMs", 1000) def defaultMaxConnectionsPerHost = getInt("defaultMaxConnectionsPerHost", 100) def assetBatchUpdateWindow = getInt("assetBatchUpdateWindowMs", 10) milliseconds diff --git a/app/collins/solr/SolrHelper.scala b/app/collins/solr/SolrHelper.scala index 344e0bb83..aa3a2fc0e 100644 --- a/app/collins/solr/SolrHelper.scala +++ b/app/collins/solr/SolrHelper.scala @@ -102,7 +102,7 @@ object SolrHelper { logger.debug("Populating Asset Logs") val num = assets.map { asset => val logs = AssetLog.findByAsset(asset) - updateAssetLogs(logs, indexTime, false) + updateAssetLogs(logs, indexTime) logs.size }.sum _server.foreach { _.commit() } @@ -111,14 +111,20 @@ object SolrHelper { }.getOrElse(logger.warn("attempted to populate solr when no server was initialized")) } - def updateItems[T](items: Seq[T], serializer: SolrSerializer[T], indexTime: Date, commit: Boolean = true) { + def updateItems[T](items: Seq[T], serializer: SolrSerializer[T], indexTime: Date) = { _server.map { server => val docs = items.map { item => prepForInsertion(serializer.serialize(item, indexTime)) } if (docs.size > 0) { val fuckingJava = new java.util.ArrayList[SolrInputDocument] docs.foreach { doc => fuckingJava.add(doc) } + + // old synchronous commit calls + /* server.add(fuckingJava) - if (commit) { + logger.error(("Added %d %s documents to be indexed".format( + fuckingJava.size, + serializer.docType.name.toLowerCase, items.head.toString))) + if (true) { server.commit() if (items.size == 1) { logger.debug(("Indexed %s: %s".format(serializer.docType.name.toLowerCase, items.head.toString))) @@ -126,6 +132,15 @@ object SolrHelper { logger.info("Indexed %d %ss".format(docs.size, serializer.docType.name.toLowerCase)) } } + */ + + // new commitWithin to lighten load on solr + server.add(fuckingJava, SolrConfig.commitWithin) + logger.error(("Added %d %s documents to be indexed within %d ms".format( + fuckingJava.size, + serializer.docType.name.toLowerCase, + SolrConfig.commitWithin))) + // dont explicitly commit, let solr figure it out } else { logger.warn("No items to index!") } @@ -133,13 +148,9 @@ object SolrHelper { } - def updateAssets(assets: Seq[Asset], indexTime: Date, commit: Boolean = true) { - updateItems[Asset](assets, AssetSerializer, indexTime, commit) - } + def updateAssets(assets: Seq[Asset], indexTime: Date) = updateItems[Asset](assets, AssetSerializer, indexTime) - def updateAssetLogs(logs: Seq[AssetLog], indexTime: Date, commit: Boolean = true) { - updateItems[AssetLog](logs, AssetLogSerializer, indexTime, commit) - } + def updateAssetLogs(logs: Seq[AssetLog], indexTime: Date) = updateItems[AssetLog](logs, AssetLogSerializer, indexTime) def terminateSolr() { _server.foreach { diff --git a/app/collins/solr/SolrUpdater.scala b/app/collins/solr/SolrUpdater.scala index 6392e181b..2af7bbd97 100644 --- a/app/collins/solr/SolrUpdater.scala +++ b/app/collins/solr/SolrUpdater.scala @@ -66,8 +66,10 @@ class AssetSolrUpdater extends Actor { class AssetLogSolrUpdater extends Actor { + // TODO(gabe): perform batch updating on asset logs as well to prevent as much churn in solr def receive = { - case log: AssetLog => SolrHelper.updateAssetLogs(List(log), new Date) + case log: AssetLog => + SolrHelper.updateAssetLogs(List(log), new Date) } } diff --git a/conf/solr/cores/collins/conf/solrconfig.xml b/conf/solr/cores/collins/conf/solrconfig.xml index 573ca8a6c..46d1463b7 100644 --- a/conf/solr/cores/collins/conf/solrconfig.xml +++ b/conf/solr/cores/collins/conf/solrconfig.xml @@ -181,22 +181,32 @@ If the updateLog is enabled, then it's highly recommended to have some sort of hard autoCommit to limit the log size. --> - - ${solr.autoCommit.maxTime:15000} - false + + ${solr.autoCommit.maxTime:15000} + ${solr.autoCommit.maxDocs:100} + false - - - ${solr.autoSoftCommit.maxTime:-1} + + + + ${solr.autoSoftCommit.maxTime:1000} - + From 12db23ccd835fe45a8a10c64641b17d3197979dc Mon Sep 17 00:00:00 2001 From: Gabe Conradi Date: Fri, 10 Mar 2017 15:26:02 -0500 Subject: [PATCH 2/8] batch updates of logs --- app/collins/solr/SolrUpdater.scala | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/app/collins/solr/SolrUpdater.scala b/app/collins/solr/SolrUpdater.scala index 2af7bbd97..d9ecffd72 100644 --- a/app/collins/solr/SolrUpdater.scala +++ b/app/collins/solr/SolrUpdater.scala @@ -29,7 +29,7 @@ class AssetSolrUpdater extends Actor { new ConcurrentHashMap[String, java.lang.Boolean]()) private[this] val assetTagsRef = new AtomicReference(newAssetTagSet) - private[this] val logger = Logger("SolrUpdater") + private[this] val logger = Logger("AssetSolrUpdater") //mutex to prevent multiple concurrent scheduler calls val scheduled = new AtomicBoolean(false) @@ -66,10 +66,30 @@ class AssetSolrUpdater extends Actor { class AssetLogSolrUpdater extends Actor { + private[this] def newLogSet = Collections.newSetFromMap[AssetLog]( + new ConcurrentHashMap[AssetLog, java.lang.Boolean]()) + private[this] val logsRef = new AtomicReference(newLogSet) + private[this] val logger = Logger("AssetLogSolrUpdater") + val scheduled = new AtomicBoolean(false) + case object Reindex + // TODO(gabe): perform batch updating on asset logs as well to prevent as much churn in solr def receive = { case log: AssetLog => - SolrHelper.updateAssetLogs(List(log), new Date) + logsRef.get.add(log) + if (scheduled.compareAndSet(false, true)) { + logger.debug("Scheduling reindex of log %d within %s".format(log.id, SolrConfig.assetBatchUpdateWindow)) + context.system.scheduler.scheduleOnce(SolrConfig.assetBatchUpdateWindow, self, Reindex) + } else { + logger.trace("Ignoring already scheduled reindex of log %d".format(log.id)) + } + case Reindex => + if (scheduled.get == true) { + val logs = logsRef.getAndSet(newLogSet).asScala.toSeq + logger.debug("Got Reindex task, working on %d logs".format(logs.size)) + SolrHelper.updateAssetLogs(logs, new Date) + scheduled.set(false) + } } } From 3879321eb9725ff3029ef3666f60616a69e393a6 Mon Sep 17 00:00:00 2001 From: Gabe Conradi Date: Fri, 10 Mar 2017 15:26:41 -0500 Subject: [PATCH 3/8] bump assetBatchUpdateWindowMs to 10ms to prevent thrashing as much --- app/collins/solr/SolrConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/collins/solr/SolrConfig.scala b/app/collins/solr/SolrConfig.scala index bbaf84651..3de746889 100644 --- a/app/collins/solr/SolrConfig.scala +++ b/app/collins/solr/SolrConfig.scala @@ -34,7 +34,7 @@ object SolrConfig extends Configurable { def maxTotalConnections = getInt("maxTotalConnections", 100) def commitWithin = getInt("commitWithinMs", 1000) def defaultMaxConnectionsPerHost = getInt("defaultMaxConnectionsPerHost", 100) - def assetBatchUpdateWindow = getInt("assetBatchUpdateWindowMs", 10) milliseconds + def assetBatchUpdateWindow = getInt("assetBatchUpdateWindowMs", 100) milliseconds override protected def validateConfig() { if (enabled) { From 8d766bffd442567ec981213c8c82081d01219d49 Mon Sep 17 00:00:00 2001 From: Gabe Conradi Date: Fri, 10 Mar 2017 15:52:50 -0500 Subject: [PATCH 4/8] make commit no fsync, no searcherwait, yes softcommit explicit --- app/collins/solr/SolrConfig.scala | 2 +- app/collins/solr/SolrHelper.scala | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/app/collins/solr/SolrConfig.scala b/app/collins/solr/SolrConfig.scala index 3de746889..a23c7d0fb 100644 --- a/app/collins/solr/SolrConfig.scala +++ b/app/collins/solr/SolrConfig.scala @@ -32,7 +32,7 @@ object SolrConfig extends Configurable { def socketTimeout = getInt("socketTimeout", 1000) def connectionTimeout = getInt("connectionTimeout", 5000) def maxTotalConnections = getInt("maxTotalConnections", 100) - def commitWithin = getInt("commitWithinMs", 1000) + def commitWithin = getInt("commitWithinMs", 200) def defaultMaxConnectionsPerHost = getInt("defaultMaxConnectionsPerHost", 100) def assetBatchUpdateWindow = getInt("assetBatchUpdateWindowMs", 100) milliseconds diff --git a/app/collins/solr/SolrHelper.scala b/app/collins/solr/SolrHelper.scala index aa3a2fc0e..389533ebc 100644 --- a/app/collins/solr/SolrHelper.scala +++ b/app/collins/solr/SolrHelper.scala @@ -140,7 +140,9 @@ object SolrHelper { fuckingJava.size, serializer.docType.name.toLowerCase, SolrConfig.commitWithin))) - // dont explicitly commit, let solr figure it out + // dont explicitly hard commit, let solr figure it out and make docs available + // to be searched ASAP. commit(boolean waitFlush, boolean waitSearcher, boolean softCommit) + server.commit(false, false, true) } else { logger.warn("No items to index!") } From 7e5ecb4259a4ddb6df677895e08a0898f5a8baa8 Mon Sep 17 00:00:00 2001 From: Gabe Conradi Date: Fri, 10 Mar 2017 16:20:19 -0500 Subject: [PATCH 5/8] bound default indexing window to 80ms --- app/collins/models/AssetLog.scala | 4 ++++ app/collins/solr/SolrConfig.scala | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/app/collins/models/AssetLog.scala b/app/collins/models/AssetLog.scala index 2e94fe310..d1f4f1b69 100644 --- a/app/collins/models/AssetLog.scala +++ b/app/collins/models/AssetLog.scala @@ -87,6 +87,10 @@ case class AssetLog( @Transient lazy val assetTag: String = asset.tag + // TODO(gabe): if this log is indexed (serialized first) before solr is + // updated with the asset document, this will throw! This can happen when + // creating a new asset then immediately performing some attribute sets + // which create logs. @Transient lazy val asset: Asset = Asset.findById(assetId).get diff --git a/app/collins/solr/SolrConfig.scala b/app/collins/solr/SolrConfig.scala index a23c7d0fb..108db0034 100644 --- a/app/collins/solr/SolrConfig.scala +++ b/app/collins/solr/SolrConfig.scala @@ -32,9 +32,9 @@ object SolrConfig extends Configurable { def socketTimeout = getInt("socketTimeout", 1000) def connectionTimeout = getInt("connectionTimeout", 5000) def maxTotalConnections = getInt("maxTotalConnections", 100) - def commitWithin = getInt("commitWithinMs", 200) + def commitWithin = getInt("commitWithinMs", 50) def defaultMaxConnectionsPerHost = getInt("defaultMaxConnectionsPerHost", 100) - def assetBatchUpdateWindow = getInt("assetBatchUpdateWindowMs", 100) milliseconds + def assetBatchUpdateWindow = getInt("assetBatchUpdateWindowMs", 30) milliseconds override protected def validateConfig() { if (enabled) { From 4574e4b4c7a9aa2cbe8707a9eea4bea75781a473 Mon Sep 17 00:00:00 2001 From: Gabe Conradi Date: Fri, 10 Mar 2017 16:31:10 -0500 Subject: [PATCH 6/8] remove old call to commit --- app/collins/solr/SolrHelper.scala | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/app/collins/solr/SolrHelper.scala b/app/collins/solr/SolrHelper.scala index 389533ebc..a7cd9e01a 100644 --- a/app/collins/solr/SolrHelper.scala +++ b/app/collins/solr/SolrHelper.scala @@ -117,24 +117,6 @@ object SolrHelper { if (docs.size > 0) { val fuckingJava = new java.util.ArrayList[SolrInputDocument] docs.foreach { doc => fuckingJava.add(doc) } - - // old synchronous commit calls - /* - server.add(fuckingJava) - logger.error(("Added %d %s documents to be indexed".format( - fuckingJava.size, - serializer.docType.name.toLowerCase, items.head.toString))) - if (true) { - server.commit() - if (items.size == 1) { - logger.debug(("Indexed %s: %s".format(serializer.docType.name.toLowerCase, items.head.toString))) - } else { - logger.info("Indexed %d %ss".format(docs.size, serializer.docType.name.toLowerCase)) - } - } - */ - - // new commitWithin to lighten load on solr server.add(fuckingJava, SolrConfig.commitWithin) logger.error(("Added %d %s documents to be indexed within %d ms".format( fuckingJava.size, From 4748453f3c84fb3b58276fcfbb0287f9a85c765d Mon Sep 17 00:00:00 2001 From: Gabe Conradi Date: Fri, 10 Mar 2017 16:34:00 -0500 Subject: [PATCH 7/8] use debug not error --- app/collins/solr/SolrHelper.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/collins/solr/SolrHelper.scala b/app/collins/solr/SolrHelper.scala index a7cd9e01a..848cb1c77 100644 --- a/app/collins/solr/SolrHelper.scala +++ b/app/collins/solr/SolrHelper.scala @@ -118,10 +118,10 @@ object SolrHelper { val fuckingJava = new java.util.ArrayList[SolrInputDocument] docs.foreach { doc => fuckingJava.add(doc) } server.add(fuckingJava, SolrConfig.commitWithin) - logger.error(("Added %d %s documents to be indexed within %d ms".format( + logger.debug("Added %d %s documents to be indexed within %d ms".format( fuckingJava.size, serializer.docType.name.toLowerCase, - SolrConfig.commitWithin))) + SolrConfig.commitWithin)) // dont explicitly hard commit, let solr figure it out and make docs available // to be searched ASAP. commit(boolean waitFlush, boolean waitSearcher, boolean softCommit) server.commit(false, false, true) From ca5d87a4fbacb476b65a7c4f2c69196a4ab06706 Mon Sep 17 00:00:00 2001 From: Gabe Conradi Date: Fri, 21 Apr 2017 21:20:19 +0200 Subject: [PATCH 8/8] fix docs some --- app/collins/models/AssetLog.scala | 1 + app/collins/solr/SolrUpdater.scala | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/app/collins/models/AssetLog.scala b/app/collins/models/AssetLog.scala index d1f4f1b69..03e78482a 100644 --- a/app/collins/models/AssetLog.scala +++ b/app/collins/models/AssetLog.scala @@ -91,6 +91,7 @@ case class AssetLog( // updated with the asset document, this will throw! This can happen when // creating a new asset then immediately performing some attribute sets // which create logs. + // See https://github.com/tumblr/collins/issues/528 @Transient lazy val asset: Asset = Asset.findById(assetId).get diff --git a/app/collins/solr/SolrUpdater.scala b/app/collins/solr/SolrUpdater.scala index d9ecffd72..6de6610c5 100644 --- a/app/collins/solr/SolrUpdater.scala +++ b/app/collins/solr/SolrUpdater.scala @@ -73,7 +73,6 @@ class AssetLogSolrUpdater extends Actor { val scheduled = new AtomicBoolean(false) case object Reindex - // TODO(gabe): perform batch updating on asset logs as well to prevent as much churn in solr def receive = { case log: AssetLog => logsRef.get.add(log)