Skip to content

Commit

Permalink
HBASE-26938 Introduce a StoreFileWriterCreationTracker
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Apr 14, 2022
1 parent f990f56 commit 79fa3a9
Show file tree
Hide file tree
Showing 23 changed files with 351 additions and 242 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.function.Consumer;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -38,6 +39,7 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
Expand All @@ -51,6 +53,7 @@
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
Expand All @@ -61,7 +64,6 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
Expand Down Expand Up @@ -146,10 +148,14 @@ public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> s
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
// make this writer with tags always because of possible new cells with tags.
return store.getStoreEngine().createWriter(
createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true));
return store.getStoreEngine()
.createWriter(
createParams(fd, shouldDropBehind, major, writerCreationTracker)
.includeMVCCReadpoint(true)
.includesTag(true));
}
};

Expand Down Expand Up @@ -285,17 +291,19 @@ private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throw
* </ol>
* @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param throughputController The compaction throughput controller.
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Clear old mob references
Expand Down Expand Up @@ -661,9 +669,8 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId
}
}


@Override
protected List<Path> commitWriter(FileDetails fd,
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void init(StoreScanner sourceScanner, WriterFactory factory) {
* comments in HBASE-15400 for more details.
*/
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
return commitWriters(maxSeqId, majorCompaction, Collections.EMPTY_SET);
return commitWriters(maxSeqId, majorCompaction, Collections.emptyList());
}

public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
Expand Down Expand Up @@ -110,11 +110,7 @@ public List<Path> abortWriters() {
return paths;
}

/**
* Returns all writers. This is used to prevent deleting currently writen storefiles
* during cleanup.
*/
public abstract Collection<StoreFileWriter> writers();
protected abstract Collection<StoreFileWriter> writers();

/**
* Subclasses override this method to be called at the end of a successful sequence of append; all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver;

import java.util.function.Consumer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -40,6 +42,8 @@ public final class CreateStoreFileWriterParams {

private String fileStoragePolicy = HConstants.EMPTY_STRING;

private Consumer<Path> writerCreationTracker;

private CreateStoreFileWriterParams() {
}

Expand Down Expand Up @@ -127,8 +131,16 @@ public CreateStoreFileWriterParams fileStoragePolicy(String fileStoragePolicy) {
return this;
}

public Consumer<Path> writerCreationTracker() {
return writerCreationTracker;
}

public CreateStoreFileWriterParams writerCreationTracker(Consumer<Path> writerCreationTracker) {
this.writerCreationTracker = writerCreationTracker;
return this;
}

public static CreateStoreFileWriterParams create() {
return new CreateStoreFileWriterParams();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void append(Cell cell) throws IOException {
}

@Override
public Collection<StoreFileWriter> writers() {
protected Collection<StoreFileWriter> writers() {
return lowerBoundary2Writer.values();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
Expand All @@ -44,8 +45,8 @@ public DefaultStoreFlusher(Configuration conf, HStore store) {

@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
Consumer<Path> writerCreationTracker) throws IOException {
ArrayList<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
Expand All @@ -59,7 +60,7 @@ public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = createWriter(snapshot, false);
writer = createWriter(snapshot, false, writerCreationTracker);
IOException e = null;
try {
performFlush(scanner, writer, throughputController);
Expand Down
Loading

1 comment on commit 79fa3a9

@apurtell
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case it wasn't clear I closed PR apache#4338 so we could go with this approach. I like it.

Please sign in to comment.