Skip to content

Commit

Permalink
Move IndexShard#getWritingBytes() under InternalEngine (#27209)
Browse files Browse the repository at this point in the history
We do some accounting in IndexShard that is not necessarily correct since
we maintain two different index readers. This change moves the accounting under
the engine which knows what reader we are refreshing.

Relates to #26972
  • Loading branch information
s1monw committed Nov 2, 2017
1 parent 6157426 commit 173ae09
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ public MergeStats getMergeStats() {
/** returns the history uuid for the engine */
public abstract String getHistoryUUID();

/** Returns how many bytes we are currently moving from heap to disk */
public abstract long getWritingBytes();

/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
Expand Down Expand Up @@ -707,7 +710,7 @@ protected void writerSegmentStats(SegmentsStats stats) {
}

/** How much heap is used that would be freed by a refresh. Note that this may throw {@link AlreadyClosedException}. */
public abstract long getIndexBufferRAMBytesUsed();
public abstract long getIndexBufferRAMBytesUsed();

protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) {
ensureOpen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ public class InternalEngine extends Engine {
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
/**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
* being indexed/deleted.
*/
private final AtomicLong writingBytes = new AtomicLong();

@Nullable
private final String historyUUID;
Expand Down Expand Up @@ -422,6 +428,12 @@ public String getHistoryUUID() {
return historyUUID;
}

/** Returns how many bytes we are currently moving from indexing buffer to segments on disk */
@Override
public long getWritingBytes() {
return writingBytes.get();
}

/**
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current
* translog id into lucene and returns null.
Expand Down Expand Up @@ -1230,21 +1242,26 @@ public void refresh(String source) throws EngineException {
}

final void refresh(String source, SearcherScope scope) throws EngineException {
long bytes = 0;
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
bytes = indexWriter.ramBytesUsed();
switch (scope) {
case EXTERNAL:
// even though we maintain 2 managers we really do the heavy-lifting only once.
// the second refresh will only do the extra work we have to do for warming caches etc.
writingBytes.addAndGet(bytes);
externalSearcherManager.maybeRefreshBlocking();
// the break here is intentional we never refresh both internal / external together
break;
case INTERNAL:
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
bytes += versionMapBytes;
writingBytes.addAndGet(bytes);
internalSearcherManager.maybeRefreshBlocking();
break;

default:
throw new IllegalArgumentException("unknown scope: " + scope);
}
Expand All @@ -1258,6 +1275,8 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
} finally {
writingBytes.addAndGet(-bytes);
}

// TODO: maybe we should just put a scheduled job in threadPool?
Expand All @@ -1271,24 +1290,7 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
public void writeIndexingBuffer() throws EngineException {
// we obtain a read lock here, since we don't want a flush to happen while we are writing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
final long indexingBufferBytes = indexWriter.ramBytesUsed();
logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
refresh("write indexing buffer", SearcherScope.INTERNAL);
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (Exception e) {
try {
failEngine("writeIndexingBuffer failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
refresh("write indexing buffer", SearcherScope.INTERNAL);
}

@Override
Expand Down
58 changes: 9 additions & 49 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final QueryCachingPolicy cachingPolicy;
private final Supplier<Sort> indexSortSupplier;

/**
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
* being indexed/deleted.
*/
private final AtomicLong writingBytes = new AtomicLong();
private final SearchOperationListener searchOperationListener;

protected volatile ShardRouting shardRouting;
Expand Down Expand Up @@ -324,12 +318,6 @@ public Store store() {
public Sort getIndexSort() {
return indexSortSupplier.get();
}
/**
* returns true if this shard supports indexing (i.e., write) operations.
*/
public boolean canIndex() {
return true;
}

public ShardGetService getService() {
return this.getService;
Expand Down Expand Up @@ -840,34 +828,21 @@ public Engine.GetResult get(Engine.Get get) {
*/
public void refresh(String source) {
verifyNotClosed();

if (canIndex()) {
long bytes = getEngine().getIndexBufferRAMBytesUsed();
writingBytes.addAndGet(bytes);
try {
if (logger.isTraceEnabled()) {
logger.trace("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes));
}
getEngine().refresh(source);
} finally {
if (logger.isTraceEnabled()) {
logger.trace("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
}
writingBytes.addAndGet(-bytes);
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("refresh with source [{}]", source);
}
getEngine().refresh(source);
if (logger.isTraceEnabled()) {
logger.trace("refresh with source [{}]", source);
}
getEngine().refresh(source);
}

/**
* Returns how many bytes we are currently moving from heap to disk
*/
public long getWritingBytes() {
return writingBytes.get();
Engine engine = getEngineOrNull();
if (engine == null) {
return 0;
}
return engine.getWritingBytes();
}

public RefreshStats refreshStats() {
Expand Down Expand Up @@ -1678,24 +1653,9 @@ private void handleRefreshException(Exception e) {
* Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk.
*/
public void writeIndexingBuffer() {
if (canIndex() == false) {
throw new UnsupportedOperationException();
}
try {
Engine engine = getEngine();
long bytes = engine.getIndexBufferRAMBytesUsed();

// NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, because version map
// memory is low enough, but this is fine because after the writes finish, IMC will poll again and see that
// there's still up to the 20% being used and continue writing if necessary:
logger.debug("add [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
writingBytes.addAndGet(bytes);
try {
engine.writeIndexingBuffer();
} finally {
writingBytes.addAndGet(-bytes);
logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
}
engine.writeIndexingBuffer();
} catch (Exception e) {
handleRefreshException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ ByteSizeValue indexingBufferSize() {
protected List<IndexShard> availableShards() {
List<IndexShard> availableShards = new ArrayList<>();
for (IndexShard shard : indexShards) {
// shadow replica doesn't have an indexing buffer
if (shard.canIndex() && CAN_WRITE_INDEX_BUFFER_STATES.contains(shard.state())) {
if (CAN_WRITE_INDEX_BUFFER_STATES.contains(shard.state())) {
availableShards.add(shard);
}
}
Expand Down

0 comments on commit 173ae09

Please sign in to comment.