diff --git a/src/Lucene.Net.TestFramework/Index/RandomDocumentsWriterPerThreadPool.cs b/src/Lucene.Net.TestFramework/Index/RandomDocumentsWriterPerThreadPool.cs deleted file mode 100644 index fc82ddb195..0000000000 --- a/src/Lucene.Net.TestFramework/Index/RandomDocumentsWriterPerThreadPool.cs +++ /dev/null @@ -1,102 +0,0 @@ -using Lucene.Net.Diagnostics; -using System; -using System.Threading; - -namespace Lucene.Net.Index -{ - /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - /// - /// A that selects thread states at random. - /// - /// @lucene.internal - /// @lucene.experimental - /// - internal class RandomDocumentsWriterPerThreadPool : DocumentsWriterPerThreadPool - { - private readonly ThreadState[] states; - private readonly Random random; - private readonly int maxRetry; - - public RandomDocumentsWriterPerThreadPool(int maxNumPerThreads, Random random) - : base(maxNumPerThreads) - { - if (Debugging.AssertsEnabled) Debugging.Assert(MaxThreadStates >= 1); - states = new ThreadState[maxNumPerThreads]; - this.random = new Random(random.Next()); - this.maxRetry = 1 + random.Next(10); - } - - public override ThreadState GetAndLock(Thread requestingThread, DocumentsWriter documentsWriter) - { - ThreadState threadState = null; - if (NumThreadStatesActive == 0) - { - lock (this) - { - if (NumThreadStatesActive == 0) - { - threadState = states[0] = NewThreadState(); - return threadState; - } - } - } - if (Debugging.AssertsEnabled) Debugging.Assert(NumThreadStatesActive > 0); - for (int i = 0; i < maxRetry; i++) - { - int ord = random.Next(NumThreadStatesActive); - lock (this) - { - threadState = states[ord]; - if (Debugging.AssertsEnabled) Debugging.Assert(threadState != null); - } - - if (threadState.TryLock()) - { - return threadState; - } - if (random.Next(20) == 0) - { - break; - } - } - /* - * only try to create a new threadstate if we can not lock the randomly - * selected state. this is important since some tests rely on a single - * threadstate in the single threaded case. Eventually it would be nice if - * we would not have this limitation but for now we just make sure we only - * allocate one threadstate if indexing is single threaded - */ - - lock (this) - { - ThreadState newThreadState = NewThreadState(); - if (newThreadState != null) // did we get a new state? - { - threadState = states[NumThreadStatesActive - 1] = newThreadState; - //if (Debugging.AssertsEnabled) Debugging.Assert(threadState.HeldByCurrentThread); - return threadState; - } - // if no new state is available lock the random one - } - if (Debugging.AssertsEnabled) Debugging.Assert(threadState != null); - threadState.@Lock(); - return threadState; - } - } -} \ No newline at end of file diff --git a/src/Lucene.Net.TestFramework/Lucene.Net.TestFramework.csproj b/src/Lucene.Net.TestFramework/Lucene.Net.TestFramework.csproj index ffe98902f2..266c75bbd2 100644 --- a/src/Lucene.Net.TestFramework/Lucene.Net.TestFramework.csproj +++ b/src/Lucene.Net.TestFramework/Lucene.Net.TestFramework.csproj @@ -129,4 +129,4 @@ - + \ No newline at end of file diff --git a/src/Lucene.Net.TestFramework/Util/LuceneTestCase.cs b/src/Lucene.Net.TestFramework/Util/LuceneTestCase.cs index 6460cbdcda..f584181855 100644 --- a/src/Lucene.Net.TestFramework/Util/LuceneTestCase.cs +++ b/src/Lucene.Net.TestFramework/Util/LuceneTestCase.cs @@ -1606,32 +1606,8 @@ public static IndexWriterConfig NewIndexWriterConfig(LuceneTestCase luceneTestCa { int maxNumThreadStates = Rarely(random) ? TestUtil.NextInt32(random, 5, 20) : TestUtil.NextInt32(random, 1, 4); // reasonable value - crazy value - if (Rarely(random)) - { - //// Retrieve the package-private setIndexerThreadPool - //// method: - ////MethodInfo setIndexerThreadPoolMethod = typeof(IndexWriterConfig).GetMethod("SetIndexerThreadPool", new Type[] { typeof(DocumentsWriterPerThreadPool) }); - //MethodInfo setIndexerThreadPoolMethod = typeof(IndexWriterConfig).GetMethod( - // "SetIndexerThreadPool", - // BindingFlags.NonPublic | BindingFlags.Instance, - // null, - // new Type[] { typeof(DocumentsWriterPerThreadPool) }, - // null); - ////setIndexerThreadPoolMethod.setAccessible(true); - //Type clazz = typeof(RandomDocumentsWriterPerThreadPool); - //ConstructorInfo ctor = clazz.GetConstructor(new[] { typeof(int), typeof(Random) }); - ////ctor.Accessible = true; - //// random thread pool - //setIndexerThreadPoolMethod.Invoke(c, new[] { ctor.Invoke(new object[] { maxNumThreadStates, r }) }); - - // LUCENENET specific: Since we are using InternalsVisibleTo, there is no need for Reflection - c.SetIndexerThreadPool(new RandomDocumentsWriterPerThreadPool(maxNumThreadStates, random)); - } - else - { - // random thread pool - c.SetMaxThreadStates(maxNumThreadStates); - } + // LUCENENET specific - Removed RandomDocumentsWriterPerThreadPool, as was done in Lucene 4.8.1 (see #208) + c.SetMaxThreadStates(maxNumThreadStates); } #if !FEATURE_INSTANCE_TESTDATA_INITIALIZATION c.SetMergePolicy(NewMergePolicy(random)); diff --git a/src/Lucene.Net.Tests/Index/TestFlushByRamOrCountsPolicy.cs b/src/Lucene.Net.Tests/Index/TestFlushByRamOrCountsPolicy.cs index 58bd570bb3..4f479261c5 100644 --- a/src/Lucene.Net.Tests/Index/TestFlushByRamOrCountsPolicy.cs +++ b/src/Lucene.Net.Tests/Index/TestFlushByRamOrCountsPolicy.cs @@ -91,7 +91,7 @@ protected internal virtual void RunFlushByRam(int numThreads, double maxRamMB, b IndexWriterConfig iwc = NewIndexWriterConfig(TEST_VERSION_CURRENT, analyzer).SetFlushPolicy(flushPolicy); int numDWPT = 1 + AtLeast(2); - DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(numDWPT); + DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(numDWPT); iwc.SetIndexerThreadPool(threadPool); iwc.SetRAMBufferSizeMB(maxRamMB); iwc.SetMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH); @@ -153,7 +153,7 @@ public virtual void TestFlushDocCount() IndexWriterConfig iwc = NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random)).SetFlushPolicy(flushPolicy); int numDWPT = 1 + AtLeast(2); - DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(numDWPT); + DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(numDWPT); iwc.SetIndexerThreadPool(threadPool); iwc.SetMaxBufferedDocs(2 + AtLeast(10)); iwc.SetRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH); @@ -206,7 +206,7 @@ public virtual void TestRandom() iwc.SetFlushPolicy(flushPolicy); int numDWPT = 1 + Random.Next(8); - DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(numDWPT); + DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(numDWPT); iwc.SetIndexerThreadPool(threadPool); IndexWriter writer = new IndexWriter(dir, iwc); @@ -277,7 +277,7 @@ public virtual void TestStallControl() FlushPolicy flushPolicy = new FlushByRamOrCountsPolicy(); iwc.SetFlushPolicy(flushPolicy); - DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(numThreads[i] == 1 ? 1 : 2); + DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(numThreads[i] == 1 ? 1 : 2); iwc.SetIndexerThreadPool(threadPool); // with such a small ram buffer we should be stalled quiet quickly iwc.SetRAMBufferSizeMB(0.25); diff --git a/src/Lucene.Net.Tests/Index/TestIndexWriterConfig.cs b/src/Lucene.Net.Tests/Index/TestIndexWriterConfig.cs index 11b761c2a0..41e7f500ad 100644 --- a/src/Lucene.Net.Tests/Index/TestIndexWriterConfig.cs +++ b/src/Lucene.Net.Tests/Index/TestIndexWriterConfig.cs @@ -81,7 +81,7 @@ public virtual void TestDefaults() Assert.IsNull(conf.MergedSegmentWarmer); Assert.AreEqual(IndexWriterConfig.DEFAULT_READER_TERMS_INDEX_DIVISOR, conf.ReaderTermsIndexDivisor); Assert.AreEqual(typeof(TieredMergePolicy), conf.MergePolicy.GetType()); - Assert.AreEqual(typeof(ThreadAffinityDocumentsWriterThreadPool), conf.IndexerThreadPool.GetType()); + Assert.AreEqual(typeof(DocumentsWriterPerThreadPool), conf.IndexerThreadPool.GetType()); Assert.AreEqual(typeof(FlushByRamOrCountsPolicy), conf.FlushPolicy.GetType()); Assert.AreEqual(IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB, conf.RAMPerThreadHardLimitMB); Assert.AreEqual(Codec.Default, conf.Codec); diff --git a/src/Lucene.Net.Tests/Index/TestStressIndexing2.cs b/src/Lucene.Net.Tests/Index/TestStressIndexing2.cs index 32097493b4..9a93772b8d 100644 --- a/src/Lucene.Net.Tests/Index/TestStressIndexing2.cs +++ b/src/Lucene.Net.Tests/Index/TestStressIndexing2.cs @@ -224,7 +224,7 @@ public virtual DocsAndWriter IndexRandomIWReader(int nThreads, int iterations, i public virtual IDictionary IndexRandom(int nThreads, int iterations, int range, Directory dir, int maxThreadStates, bool doReaderPooling) { IDictionary docs = new Dictionary(); - IndexWriter w = RandomIndexWriter.MockIndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random)).SetOpenMode(OpenMode.CREATE).SetRAMBufferSizeMB(0.1).SetMaxBufferedDocs(maxBufferedDocs).SetIndexerThreadPool(new ThreadAffinityDocumentsWriterThreadPool(maxThreadStates)).SetReaderPooling(doReaderPooling).SetMergePolicy(NewLogMergePolicy()), new YieldTestPoint(this)); + IndexWriter w = RandomIndexWriter.MockIndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random)).SetOpenMode(OpenMode.CREATE).SetRAMBufferSizeMB(0.1).SetMaxBufferedDocs(maxBufferedDocs).SetIndexerThreadPool(new DocumentsWriterPerThreadPool(maxThreadStates)).SetReaderPooling(doReaderPooling).SetMergePolicy(NewLogMergePolicy()), new YieldTestPoint(this)); LogMergePolicy lmp = (LogMergePolicy)w.Config.MergePolicy; lmp.NoCFSRatio = 0.0; lmp.MergeFactor = mergeFactor; diff --git a/src/Lucene.Net/Index/DocumentsWriter.cs b/src/Lucene.Net/Index/DocumentsWriter.cs index abe72746e6..8772f3680d 100644 --- a/src/Lucene.Net/Index/DocumentsWriter.cs +++ b/src/Lucene.Net/Index/DocumentsWriter.cs @@ -526,7 +526,7 @@ internal bool UpdateDocuments(IEnumerable> docs, An } finally { - perThread.Unlock(); + perThreadPool.Release(perThread); } return PostUpdate(flushingDWPT, hasEvents); @@ -572,7 +572,7 @@ internal bool UpdateDocument(IEnumerable doc, Analyzer analyzer } finally { - perThread.Unlock(); + perThreadPool.Release(perThread); } return PostUpdate(flushingDWPT, hasEvents); @@ -809,7 +809,7 @@ private void PutEvent(IEvent @event) internal sealed class ApplyDeletesEvent : IEvent { internal static readonly IEvent INSTANCE = new ApplyDeletesEvent(); - private int instCount = 0; + private static int instCount = 0; // LUCENENET: Made static, otherwise this makes no sense at all internal ApplyDeletesEvent() { @@ -826,7 +826,7 @@ public void Process(IndexWriter writer, bool triggerMerge, bool forcePurge) internal sealed class MergePendingEvent : IEvent { internal static readonly IEvent INSTANCE = new MergePendingEvent(); - private int instCount = 0; + private static int instCount = 0; // LUCENENET: Made static, otherwise this makes no sense at all internal MergePendingEvent() { @@ -843,7 +843,7 @@ public void Process(IndexWriter writer, bool triggerMerge, bool forcePurge) internal sealed class ForcedPurgeEvent : IEvent { internal static readonly IEvent INSTANCE = new ForcedPurgeEvent(); - private int instCount = 0; + private static int instCount = 0; // LUCENENET: Made static, otherwise this makes no sense at all internal ForcedPurgeEvent() { diff --git a/src/Lucene.Net/Index/DocumentsWriterFlushControl.cs b/src/Lucene.Net/Index/DocumentsWriterFlushControl.cs index 346d834a40..c955aecc84 100644 --- a/src/Lucene.Net/Index/DocumentsWriterFlushControl.cs +++ b/src/Lucene.Net/Index/DocumentsWriterFlushControl.cs @@ -613,8 +613,8 @@ internal ThreadState ObtainAndLock() { if (!success) // make sure we unlock if this fails { - perThread.Unlock(); - } + perThreadPool.Release(perThread); + } } } diff --git a/src/Lucene.Net/Index/DocumentsWriterPerThreadPool.cs b/src/Lucene.Net/Index/DocumentsWriterPerThreadPool.cs index 1e356d87a0..60a34f1cb6 100644 --- a/src/Lucene.Net/Index/DocumentsWriterPerThreadPool.cs +++ b/src/Lucene.Net/Index/DocumentsWriterPerThreadPool.cs @@ -1,3 +1,4 @@ +// Lucene version compatibility level: 4.8.1 using Lucene.Net.Diagnostics; using Lucene.Net.Support.Threading; using System; @@ -37,7 +38,8 @@ namespace Lucene.Net.Index /// is reusing the flushing s with a /// new instance. /// - internal abstract class DocumentsWriterPerThreadPool + + internal sealed class DocumentsWriterPerThreadPool #if FEATURE_CLONEABLE : System.ICloneable #endif @@ -152,9 +154,12 @@ public DocumentsWriterPerThread DocumentsWriterPerThread public bool IsFlushPending => flushPending; } - private ThreadState[] threadStates; + private readonly ThreadState[] threadStates; private volatile int numThreadStatesActive; + private readonly ThreadState[] freeList; + private volatile int freeCount; + /// /// Creates a new with a given maximum of s. /// @@ -170,38 +175,29 @@ internal DocumentsWriterPerThreadPool(int maxNumThreadStates) { threadStates[i] = new ThreadState(null); } + freeList = new ThreadState[maxNumThreadStates]; } - public virtual object Clone() + public object Clone() { // We should only be cloned before being used: if (numThreadStatesActive != 0) { throw new InvalidOperationException("clone this object before it is used!"); } - - DocumentsWriterPerThreadPool clone; - - clone = (DocumentsWriterPerThreadPool)base.MemberwiseClone(); - - clone.threadStates = new ThreadState[threadStates.Length]; - for (int i = 0; i < threadStates.Length; i++) - { - clone.threadStates[i] = new ThreadState(null); - } - return clone; + return new DocumentsWriterPerThreadPool(threadStates.Length); } /// /// Returns the max number of instances available in this /// /// - public virtual int MaxThreadStates => threadStates.Length; + public int MaxThreadStates => threadStates.Length; /// /// Returns the active number of instances. /// - public virtual int NumThreadStatesActive => numThreadStatesActive; // LUCENENET NOTE: Changed from getActiveThreadState() because the name wasn't clear + public int NumThreadStatesActive => numThreadStatesActive; // LUCENENET NOTE: Changed from getActiveThreadState() because the name wasn't clear /// /// Returns a new iff any new state is available otherwise @@ -211,40 +207,36 @@ public virtual object Clone() /// /// a new iff any new state is available otherwise /// null - public virtual ThreadState NewThreadState() + public ThreadState NewThreadState() { - lock (this) + if (Debugging.AssertsEnabled) Debugging.Assert(numThreadStatesActive < threadStates.Length); + + ThreadState threadState = threadStates[numThreadStatesActive]; + threadState.Lock(); // lock so nobody else will get this ThreadState + bool unlock = true; + try { - if (numThreadStatesActive < threadStates.Length) + if (threadState.IsActive) { - ThreadState threadState = threadStates[numThreadStatesActive]; - threadState.@Lock(); // lock so nobody else will get this ThreadState - bool unlock = true; - try - { - if (threadState.IsActive) - { - // unreleased thread states are deactivated during DW#close() - numThreadStatesActive++; // increment will publish the ThreadState - if (Debugging.AssertsEnabled) Debugging.Assert(threadState.dwpt == null); - unlock = false; - return threadState; - } - // unlock since the threadstate is not active anymore - we are closed! - if (Debugging.AssertsEnabled) Debugging.Assert(AssertUnreleasedThreadStatesInactive()); - return null; - } - finally - { - if (unlock) - { - // in any case make sure we unlock if we fail - threadState.Unlock(); - } - } + // unreleased thread states are deactivated during DW#close() + numThreadStatesActive++; // increment will publish the ThreadState + //System.out.println("activeCount=" + numThreadStatesActive); + if (Debugging.AssertsEnabled) Debugging.Assert(threadState.dwpt == null); + unlock = false; + return threadState; } + // we are closed: unlock since the threadstate is not active anymore + if (Debugging.AssertsEnabled) Debugging.Assert(AssertUnreleasedThreadStatesInactive()); return null; } + finally + { + if (unlock) + { + // in any case make sure we unlock if we fail + threadState.Unlock(); + } + } } private bool AssertUnreleasedThreadStatesInactive() @@ -270,7 +262,7 @@ private bool AssertUnreleasedThreadStatesInactive() /// /// Deactivate all unreleased threadstates /// - internal virtual void DeactivateUnreleasedStates() + internal void DeactivateUnreleasedStates() { lock (this) { @@ -290,7 +282,7 @@ internal virtual void DeactivateUnreleasedStates() } } - internal virtual DocumentsWriterPerThread Reset(ThreadState threadState, bool closed) + internal DocumentsWriterPerThread Reset(ThreadState threadState, bool closed) { if (Debugging.AssertsEnabled) Debugging.Assert(threadState.IsHeldByCurrentThread); DocumentsWriterPerThread dwpt = threadState.dwpt; @@ -305,14 +297,83 @@ internal virtual DocumentsWriterPerThread Reset(ThreadState threadState, bool cl return dwpt; } - internal virtual void Recycle(DocumentsWriterPerThread dwpt) + internal void Recycle(DocumentsWriterPerThread dwpt) { // don't recycle DWPT by default } // you cannot subclass this without being in o.a.l.index package anyway, so // the class is already pkg-private... fix me: see LUCENE-4013 - public abstract ThreadState GetAndLock(Thread requestingThread, DocumentsWriter documentsWriter); // LUCENENET NOTE: Made public rather than internal + public ThreadState GetAndLock(Thread requestingThread, DocumentsWriter documentsWriter) + { + ThreadState threadState = null; + lock (this) + { + for (;;) + { + if (freeCount > 0) + { + // Important that we are LIFO here! This way if number of concurrent indexing threads was once high, but has now reduced, we only use a + // limited number of thread states: + threadState = freeList[freeCount - 1]; + if (threadState.dwpt == null) + { + // This thread-state is not initialized, e.g. it + // was just flushed. See if we can instead find + // another free thread state that already has docs + // indexed. This way if incoming thread concurrency + // has decreased, we don't leave docs + // indefinitely buffered, tying up RAM. This + // will instead get those thread states flushed, + // freeing up RAM for larger segment flushes: + for (int i = 0; i < freeCount; i++) + { + if (freeList[i].dwpt != null) + { + // Use this one instead, and swap it with + // the un-initialized one: + ThreadState ts = freeList[i]; + freeList[i] = threadState; + threadState = ts; + break; + } + } + } + + freeCount--; + break; + } + else if (NumThreadStatesActive < threadStates.Length) + { + // ThreadState is already locked before return by this method: + return NewThreadState(); + } + else + { + // Wait until a thread state frees up: + Monitor.Wait(this); + } + } + } + + // This could take time, e.g. if the threadState is [briefly] checked for flushing: + threadState.Lock(); + + return threadState; + } + + public void Release(ThreadState state) + { + state.Unlock(); + lock (this) + { + Debug.Assert(freeCount < freeList.Length); + freeList[freeCount++] = state; + // In case any thread is waiting, wake one of them up since we just released a thread state; notify() should be sufficient but we do + // notifyAll defensively: + Monitor.PulseAll(this); + } + } /// /// Returns the ith active where i is the @@ -322,7 +383,7 @@ internal virtual void Recycle(DocumentsWriterPerThread dwpt) /// the ordinal of the /// the ith active where i is the /// given ord. - internal virtual ThreadState GetThreadState(int ord) + internal ThreadState GetThreadState(int ord) { return threadStates[ord]; } @@ -332,7 +393,7 @@ internal virtual ThreadState GetThreadState(int ord) /// waiting to acquire its lock or null if no /// is yet visible to the calling thread. /// - internal virtual ThreadState MinContendedThreadState() + internal ThreadState MinContendedThreadState() { ThreadState minThreadState = null; int limit = numThreadStatesActive; @@ -352,7 +413,7 @@ internal virtual ThreadState MinContendedThreadState() /// A deactivated should not be used for indexing anymore. /// /// the number of currently deactivated instances. - internal virtual int NumDeactivatedThreadStates() + internal int NumDeactivatedThreadStates() { int count = 0; for (int i = 0; i < threadStates.Length; i++) @@ -380,7 +441,7 @@ internal virtual int NumDeactivatedThreadStates() /// if the parent is closed or aborted. /// /// the state to deactivate - internal virtual void DeactivateThreadState(ThreadState threadState) + internal void DeactivateThreadState(ThreadState threadState) { if (Debugging.AssertsEnabled) Debugging.Assert(threadState.IsActive); threadState.Deactivate(); diff --git a/src/Lucene.Net/Index/IndexWriterConfig.cs b/src/Lucene.Net/Index/IndexWriterConfig.cs index 79ffc7cde3..105bf90241 100644 --- a/src/Lucene.Net/Index/IndexWriterConfig.cs +++ b/src/Lucene.Net/Index/IndexWriterConfig.cs @@ -387,7 +387,7 @@ public object Clone() /// Expert: Gets or sets the instance used by the /// to assign thread-states to incoming indexing threads. If no /// is set will use - /// with max number of + /// with max number of /// thread-states set to (see /// ). /// @@ -429,14 +429,17 @@ public object Clone() { try { - return ((ThreadAffinityDocumentsWriterThreadPool)indexerThreadPool).MaxThreadStates; + return indexerThreadPool.MaxThreadStates; } catch (InvalidCastException cce) { throw new InvalidOperationException(cce.Message, cce); } } - set => this.indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(value); + set + { + this.indexerThreadPool = new DocumentsWriterPerThreadPool(value); + } } /// diff --git a/src/Lucene.Net/Index/LiveIndexWriterConfig.cs b/src/Lucene.Net/Index/LiveIndexWriterConfig.cs index f031115d3b..b564d45cd1 100644 --- a/src/Lucene.Net/Index/LiveIndexWriterConfig.cs +++ b/src/Lucene.Net/Index/LiveIndexWriterConfig.cs @@ -169,7 +169,7 @@ internal LiveIndexWriterConfig(Analyzer analyzer, LuceneVersion matchVersion) mergePolicy = new TieredMergePolicy(); flushPolicy = new FlushByRamOrCountsPolicy(); readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING; - indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES); + indexerThreadPool = new DocumentsWriterPerThreadPool(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES); perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB; } @@ -504,7 +504,7 @@ public virtual int MaxThreadStates { try { - return ((ThreadAffinityDocumentsWriterThreadPool)indexerThreadPool).MaxThreadStates; + return indexerThreadPool.MaxThreadStates; } catch (InvalidCastException cce) { diff --git a/src/Lucene.Net/Index/ThreadAffinityDocumentsWriterThreadPool.cs b/src/Lucene.Net/Index/ThreadAffinityDocumentsWriterThreadPool.cs deleted file mode 100644 index 58d4fb790d..0000000000 --- a/src/Lucene.Net/Index/ThreadAffinityDocumentsWriterThreadPool.cs +++ /dev/null @@ -1,95 +0,0 @@ -using Lucene.Net.Diagnostics; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Diagnostics; -using System.Threading; - -namespace Lucene.Net.Index -{ - /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - using ThreadState = Lucene.Net.Index.DocumentsWriterPerThreadPool.ThreadState; //javadoc - - /// - /// A implementation that tries to assign an - /// indexing thread to the same each time the thread tries to - /// obtain a . Once a new is created it is - /// associated with the creating thread. Subsequently, if the threads associated - /// is not in use it will be associated with the requesting - /// thread. Otherwise, if the is used by another thread - /// tries to find the currently - /// minimal contended . - /// - internal class ThreadAffinityDocumentsWriterThreadPool : DocumentsWriterPerThreadPool - { - private IDictionary threadBindings = new ConcurrentDictionary(); - - /// - /// Creates a new with a given maximum of s. - /// - public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) - : base(maxNumPerThreads) - { - if (Debugging.AssertsEnabled) Debugging.Assert(MaxThreadStates >= 1); - } - - public override ThreadState GetAndLock(Thread requestingThread, DocumentsWriter documentsWriter) - { - if (threadBindings.TryGetValue(requestingThread, out ThreadState threadState) && threadState.TryLock()) - { - return threadState; - } - ThreadState minThreadState = null; - - /* TODO -- another thread could lock the minThreadState we just got while - we should somehow prevent this. */ - // Find the state that has minimum number of threads waiting - minThreadState = MinContendedThreadState(); - if (minThreadState == null || minThreadState.HasQueuedThreads) - { - ThreadState newState = NewThreadState(); // state is already locked if non-null - if (newState != null) - { - if (Debugging.AssertsEnabled) Debugging.Assert(newState.IsHeldByCurrentThread); - threadBindings[requestingThread] = newState; - return newState; - } - else if (minThreadState == null) - { - /* - * no new threadState available we just take the minContented one - * this must return a valid thread state since we accessed the - * synced context in newThreadState() above. - */ - minThreadState = MinContendedThreadState(); - } - } - if (Debugging.AssertsEnabled) Debugging.Assert(minThreadState != null, () => "ThreadState is null"); - - minThreadState.@Lock(); - return minThreadState; - } - - public override object Clone() - { - ThreadAffinityDocumentsWriterThreadPool clone = (ThreadAffinityDocumentsWriterThreadPool)base.Clone(); - clone.threadBindings = new ConcurrentDictionary(); - return clone; - } - } -} \ No newline at end of file diff --git a/src/Lucene.Net/Lucene.Net.csproj b/src/Lucene.Net/Lucene.Net.csproj index e218980298..e0372d082a 100644 --- a/src/Lucene.Net/Lucene.Net.csproj +++ b/src/Lucene.Net/Lucene.Net.csproj @@ -116,7 +116,7 @@ - +