Skip to content

Commit

Permalink
HBASE-27529 Provide RS coproc ability to attach WAL extended attribut…
Browse files Browse the repository at this point in the history
…es to mutations at replication sink (#4924)

Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
virajjasani committed Jan 16, 2023
1 parent 6244891 commit 6d2064d
Show file tree
Hide file tree
Showing 8 changed files with 401 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

import java.io.IOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;

/**
* Defines coprocessor hooks for interacting with operations on the
* {@link org.apache.hadoop.hbase.regionserver.HRegionServer} process. Since most implementations
Expand Down Expand Up @@ -137,4 +140,33 @@ default void preExecuteProcedures(ObserverContext<RegionServerCoprocessorEnviron
default void postExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {
}

/**
* This will be called before replication sink mutations are executed on the sink table as part of
* batch call.
* @param ctx the environment to interact with the framework and region server.
* @param walEntry wal entry from which mutation is formed.
* @param mutation mutation to be applied at sink cluster.
* @throws IOException if something goes wrong.
*/
default void preReplicationSinkBatchMutate(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
Mutation mutation) throws IOException {

}

/**
* This will be called after replication sink mutations are executed on the sink table as part of
* batch call.
* @param ctx the environment to interact with the framework and region server.
* @param walEntry wal entry from which mutation is formed.
* @param mutation mutation to be applied at sink cluster.
* @throws IOException if something goes wrong.
*/
default void postReplicationSinkBatchMutate(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
Mutation mutation) throws IOException {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SharedConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
Expand All @@ -41,6 +42,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;

@InterfaceAudience.Private
public class RegionServerCoprocessorHost
extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
Expand Down Expand Up @@ -173,6 +176,26 @@ public void call(RegionServerObserver observer) throws IOException {
});
}

public void preReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.preReplicationSinkBatchMutate(this, walEntry, mutation);
}
});
}

public void postReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.postReplicationSinkBatchMutate(this, walEntry, mutation);
}
});
}

public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
throws IOException {
if (this.coprocEnvironments.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
Expand Down Expand Up @@ -72,7 +74,11 @@ public void initialize(Server server, FileSystem fs, Path logdir, Path oldLogDir

@Override
public void startReplicationService() throws IOException {
this.replicationSink = new ReplicationSink(this.conf);
RegionServerCoprocessorHost rsServerHost = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
}
this.replicationSink = new ReplicationSink(this.conf, rsServerHost);
this.server.getChoreService().scheduleChore(new ReplicationStatisticsChore(
"ReplicationSinkStatistics", server, (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
Expand Down Expand Up @@ -190,7 +192,11 @@ public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
@Override
public void startReplicationService() throws IOException {
this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf);
RegionServerCoprocessorHost rsServerHost = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
}
this.replicationSink = new ReplicationSink(this.conf, rsServerHost);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
Expand Down Expand Up @@ -97,13 +98,17 @@ public class ReplicationSink {
*/
private final int rowSizeWarnThreshold;

private final RegionServerCoprocessorHost rsServerHost;

/**
* Create a sink for replication
* @param conf conf object
* @throws IOException thrown when HDFS goes bad or bad file name
*/
public ReplicationSink(Configuration conf) throws IOException {
public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerHost)
throws IOException {
this.conf = HBaseConfiguration.create(conf);
this.rsServerHost = rsServerHost;
rowSizeWarnThreshold =
conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
decorateConf();
Expand Down Expand Up @@ -160,6 +165,8 @@ private void decorateConf() {
/**
* Replicate this array of entries directly into the local cluster using the native client. Only
* operates against raw protobuf type saving on a conversion from pb to pojo.
* @param entries WAL entries to be replicated.
* @param cells cell scanner for iteration.
* @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory
* @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
Expand All @@ -180,6 +187,8 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();

Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
Pair<List<Mutation>, List<WALEntry>> mutationsToWalEntriesPairs =
new Pair<>(new ArrayList<>(), new ArrayList<>());
for (WALEntry entry : entries) {
TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
if (this.walEntrySinkFilter != null) {
Expand Down Expand Up @@ -231,6 +240,11 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
clusterIds.add(toUUID(clusterId));
}
mutation.setClusterIds(clusterIds);
if (rsServerHost != null) {
rsServerHost.preReplicationSinkBatchMutate(entry, mutation);
mutationsToWalEntriesPairs.getFirst().add(mutation);
mutationsToWalEntriesPairs.getSecond().add(entry);
}
addToHashMultiMap(rowMap, table, clusterIds, mutation);
}
if (CellUtil.isDelete(cell)) {
Expand All @@ -253,6 +267,14 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
LOG.debug("Finished replicating mutations.");
}

if (rsServerHost != null) {
List<Mutation> mutations = mutationsToWalEntriesPairs.getFirst();
List<WALEntry> walEntries = mutationsToWalEntriesPairs.getSecond();
for (int i = 0; i < mutations.size(); i++) {
rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i));
}
}

if (bulkLoadsPerClusters != null) {
for (Entry<List<String>,
Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
Expand Down
Loading

0 comments on commit 6d2064d

Please sign in to comment.