Skip to content

Commit

Permalink
HBASE-26422 Support priority select reference files for compaction of…
Browse files Browse the repository at this point in the history
… stripe store engine
  • Loading branch information
sunhelly committed Nov 18, 2021
1 parent b2571df commit 4f05e7b
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,8 @@ public interface Store {
* if you try to set a configuration.
*/
Configuration getReadOnlyConfiguration();

default String getName() {
return String.format("%s:%s", getRegionInfo().getEncodedName(), getColumnFamilyName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
Expand Down Expand Up @@ -171,4 +173,14 @@ public static Configuration createStoreConfiguration(Configuration conf, TableDe
return new CompoundConfiguration().add(conf).addBytesMap(td.getValues())
.addStringMap(cfd.getConfiguration()).addBytesMap(cfd.getValues());
}

public static List<HStoreFile> filteredReferenceFiles(final Collection<HStoreFile> files) {
List<HStoreFile> referenceFiles = new ArrayList<>();
for (HStoreFile sf : files) {
if (sf.isReference() || StoreFileInfo.isReference(sf.getPath())) {
referenceFiles.add(sf);
}
}
return referenceFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
*/
package org.apache.hadoop.hbase.regionserver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;

/**
* Configuration class for stripe store and compactions.
Expand Down Expand Up @@ -65,6 +65,11 @@ public class StripeStoreConfig {
public static final String MAX_REGION_SPLIT_IMBALANCE_KEY =
"hbase.store.stripe.region.split.max.imbalance";

/**
* Configure for enable priority select Reference files to compact in StripeCompactPolicy
*/
public static final String PRIORITY_COMPACT_REFERENCE_FILES_ENABLED =
"hbase.store.stripe.region.priority.compact.reference.files.enabled";

private final float maxRegionSplitImbalance;
private final int level0CompactMinFiles;
Expand All @@ -77,6 +82,8 @@ public class StripeStoreConfig {
private final boolean flushIntoL0;
private final long splitPartSize; // derived from sizeToSplitAt and splitPartCount

private final boolean priorityCompactRefsEnabled;

private static final double EPSILON = 0.001; // good enough for this, not a real epsilon.
public StripeStoreConfig(Configuration config, StoreConfigInformation sci) {
this.level0CompactMinFiles = config.getInt(MIN_FILES_L0_KEY, 4);
Expand Down Expand Up @@ -109,6 +116,8 @@ public StripeStoreConfig(Configuration config, StoreConfigInformation sci) {
}
this.initialCount = initialCount;
this.splitPartSize = (long)(this.sizeToSplitAt / this.splitPartCount);
this.priorityCompactRefsEnabled =
config.getBoolean(PRIORITY_COMPACT_REFERENCE_FILES_ENABLED, false);
}

private static float getFloat(
Expand Down Expand Up @@ -163,4 +172,8 @@ public float getSplitCount() {
public long getSplitPartSize() {
return splitPartSize;
}

public boolean isPriorityCompactRefsEnabled() {
return priorityCompactRefsEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected void createComponents(
Configuration conf, HStore store, CellComparator comparator) throws IOException {
this.config = new StripeStoreConfig(conf, store);
this.compactionPolicy = new StripeCompactionPolicy(conf, store, config);
this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config);
this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config, store);
this.storeFlusher = new StripeStoreFlusher(
conf, store, this.compactionPolicy, this.storeFileManager);
this.compactor = new StripeCompactor(conf, store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,31 @@ private static class State {

private final int blockingFileCount;

private final String storeName;

public StripeStoreFileManager(
CellComparator kvComparator, Configuration conf, StripeStoreConfig config) {
CellComparator kvComparator, Configuration conf, StripeStoreConfig config, HStore store) {
this.cellComparator = kvComparator;
this.config = config;
this.blockingFileCount = conf.getInt(
HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
if(store != null) {
storeName = store.getName();
} else {
storeName = "";
}
}

@Override
public void loadFiles(List<HStoreFile> storeFiles) {
loadUnclassifiedStoreFiles(storeFiles);
}

@Override
public String getStoreName() {
return storeName;
}

@Override
public Collection<HStoreFile> getStorefiles() {
return state.allFilesCached;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver.compactions;

import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -41,6 +42,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;

/**
Expand Down Expand Up @@ -116,6 +118,87 @@ public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
// compact-all-things behavior.
Collection<HStoreFile> allFiles = si.getStorefiles();
if (StoreUtils.hasReferences(allFiles)) {
if (config.isPriorityCompactRefsEnabled()) {
// try to only select reference files
LOG.info("There are references in the store {}, compact reference files only. ",
si.getStoreName());
List<HStoreFile> l0References = StoreUtils.filteredReferenceFiles(si.getLevel0Files());
if (!l0References.isEmpty()) {
boolean needSelect = needSelectFiles(l0References);
if (needSelect) {
// need select L0 reference file compaction means L0 is very large.
// if L0 reference is large, then we should compact large stripes first, to make sure
// the stripes will not too large after the large L0 compaction.
StripeCompactionRequest result = selectSingleStripeCompaction(
si, false, false, false);
if (result != null) {
LOG.debug("Performing one whole stripe split compaction after split, {}",
si.getStoreName());
return result;
}
}
List<HStoreFile> toCompactL0Refs = needSelect ?
selectSimpleCompaction(l0References, false, false, true) :
l0References;
assert !toCompactL0Refs.isEmpty() : "To compact reference files should not be empty";
String msg = "";
if (LOG.isDebugEnabled()) {
msg = String.format("Compact L0 references only after split. %d store files, "
+ "%d L0 files, %d reference files length %d, to "
+ "compact %d reference files with length %d, store %s", allFiles.size(),
si.getLevel0Files().size(), l0References.size(), getTotalFileSize(l0References),
toCompactL0Refs.size(), getTotalFileSize(toCompactL0Refs), si.getStoreName());
}
StripeCompactionRequest request;
if (si.getStripeCount() > 0) {
// do L0 reference compaction, will perform boundary compaction
LOG.debug(msg + ". Performing boundary compaction.");
request =
new BoundaryStripeCompactionRequest(toCompactL0Refs, si.getStripeBoundaries());
} else {
// do L0 reference compaction, will perform split compaction
LOG.debug(msg + ". Performing split stripe compaction.");
long targetKvs =
estimateTargetKvs(toCompactL0Refs, config.getInitialCount()).getFirst();
request = new SplitStripeCompactionRequest(toCompactL0Refs, OPEN_KEY, OPEN_KEY, targetKvs);
}
request.getRequest().setAfterSplit(true);
request.getRequest().setIsMajor(false, false);
return request;
}
// select reference files in a single stripe
int priorityStripe = getStripeIndexWithReferences(si);
if (priorityStripe != -1) {
LOG.debug("The stripe {} has reference files, select all files in this stripe to "
+ "compact, store {}", priorityStripe, si.getStoreName());
Collection<HStoreFile> priorityStripeFiles = si.getStripes().get(priorityStripe);
int targetCount = 1;
long targetKvs = Long.MAX_VALUE;
long toCompactSize = getTotalFileSize(priorityStripeFiles);
if (toCompactSize >= config.getSplitSize()) {
Pair<Long, Integer> estimate =
estimateTargetKvs(priorityStripeFiles, config.getSplitCount());
targetCount = estimate.getSecond();
targetKvs = estimate.getFirst();
}
String splitString =
"; the stripe will be split into at most " + targetCount + " stripes with "
+ targetKvs + " target KVs, toCompact files size is " + toCompactSize;
StripeCompactionRequest request =
new SplitStripeCompactionRequest(priorityStripeFiles,
si.getStartRow(priorityStripe),
si.getEndRow(priorityStripe), targetCount, targetKvs);
LOG.debug(
"Priority compact stripe {} all files, selecting {} files, " + " store {}",
priorityStripe, request.getRequest().getFiles().size(),
si.getStoreName() + splitString);
request.getRequest().setAfterSplit(true);
request.getRequest().setIsMajor(false, false);
return request;
}
}

// the priority compact reference files is disabled, so compact all files after split
LOG.debug("There are references in the store; compacting all files");
long targetKvs = estimateTargetKvs(allFiles, config.getInitialCount()).getFirst();
SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
Expand Down Expand Up @@ -162,6 +245,35 @@ public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak);
}

/**
* Check if the size or count of the participants is too large to select all
* for one compaction request
* @param participants participant store files
* @return True if need to select partial of the participants, or else False
*/
private boolean needSelectFiles(final List<HStoreFile> participants) {
return participants.size() > this.config.getStripeCompactMaxFiles() ||
getTotalFileSize(participants) > comConf.getMaxCompactSize();
}

/**
* Get the index of the stripe who has reference files
* @param si the stripe information provider
* @return the index of a stripe, [0,n-1]
*/
private int getStripeIndexWithReferences(StripeInformationProvider si) {
ArrayList<ImmutableList<HStoreFile>> stripeFiles = si.getStripes();
for (int i = 0; i < stripeFiles.size(); ++i) {
ImmutableList<HStoreFile> oneStripeFiles = stripeFiles.get(i);
if (StoreUtils.hasReferences(oneStripeFiles)) {
LOG.debug("Stripe {} has references, endRow {}, store {}", i, si.getEndRow(i),
si.getStoreName());
return i;
}
}
return -1;
}

public boolean needsCompactions(StripeInformationProvider si, List<HStoreFile> filesCompacting) {
// Approximation on whether we need compaction.
return filesCompacting.isEmpty()
Expand Down Expand Up @@ -559,6 +671,12 @@ public void setMajorRangeFull() {

/** The information about stripes that the policy needs to do its stuff */
public static interface StripeInformationProvider {
/**
* The store name, can be used in log print
* @return
*/
String getStoreName();

public Collection<HStoreFile> getStorefiles();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ private static StripeStoreFileManager createManager(
StripeStoreConfig config = new StripeStoreConfig(
conf, Mockito.mock(StoreConfigInformation.class));
StripeStoreFileManager result = new StripeStoreFileManager(CellComparatorImpl.COMPARATOR, conf,
config);
config, null);
result.loadFiles(sfs);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver.compactions;

import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MAX_FILES_KEY;
import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MIN_FILES_KEY;
import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.PRIORITY_COMPACT_REFERENCE_FILES_ENABLED;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
import static org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -92,6 +94,7 @@
import org.mockito.ArgumentMatcher;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.ListUtils;

@RunWith(Parameterized.class)
@Category({RegionServerTests.class, MediumTests.class})
Expand Down Expand Up @@ -251,8 +254,63 @@ public void testWithReferences() throws Exception {
assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles()));
scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
any(), any());
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), any(), any());
}

@Test
public void testPriorityReferences() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(PRIORITY_COMPACT_REFERENCE_FILES_ENABLED, true);
StripeCompactionPolicy policy = createPolicy(conf);
StripeCompactor sc = mock(StripeCompactor.class);
HStoreFile ref = createFile();
when(ref.isReference()).thenReturn(true);
StripeInformationProvider si = mock(StripeInformationProvider.class);
List<HStoreFile> mixed = al(ref, createFile());
when(si.getStorefiles()).thenReturn(mixed);
when(si.getLevel0Files()).thenReturn(mixed);
Collection<HStoreFile> refs = al(ref);

assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
assertTrue(ListUtils.isEqualList(refs, scr.getRequest().getFiles()));
scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
// this compaction request has no major range
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
aryEq(OPEN_KEY), any(), any(), any(), any());
}

@Test
public void testPrioritySelectReferences() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(PRIORITY_COMPACT_REFERENCE_FILES_ENABLED, true);
int compactFileCount = 2;
conf.setInt(MIN_FILES_KEY, compactFileCount);
conf.setInt(MAX_FILES_KEY,compactFileCount);
StripeCompactionPolicy policy = createPolicy(conf);
StripeCompactor sc = mock(StripeCompactor.class);
StripeInformationProvider si = mock(StripeInformationProvider.class);
List<HStoreFile> sfs = mockRefs(10, 10);
when(si.getStorefiles()).thenReturn(sfs);
when(si.getLevel0Files()).thenReturn(sfs);

assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
assertEquals(compactFileCount, scr.getRequest().getFiles().size());
}

private List<HStoreFile> mockRefs(int refCount, int otherCount) throws Exception {
List<HStoreFile> files = new ArrayList<>(refCount + otherCount);
for(int i = 0;i<refCount;i++) {
HStoreFile ref = createFile();
when(ref.isReference()).thenReturn(true);
files.add(ref);
}
for(int i = 0;i<otherCount;i++) {
HStoreFile ref = createFile();
files.add(ref);
}
return files;
}

@Test
Expand Down

0 comments on commit 4f05e7b

Please sign in to comment.