Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27529 Provide RS coproc ability to attach WAL extended attributes to mutations at replication sink #4924

Merged
merged 7 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

import java.io.IOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;

/**
* Defines coprocessor hooks for interacting with operations on the
* {@link org.apache.hadoop.hbase.regionserver.HRegionServer} process. Since most implementations
Expand Down Expand Up @@ -137,4 +140,33 @@ default void preExecuteProcedures(ObserverContext<RegionServerCoprocessorEnviron
default void postExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {
}

/**
* This will be called before replication sink mutations are executed on the sink table as part of
* batch call.
* @param ctx the environment to interact with the framework and region server.
* @param walEntry wal entry from which mutation is formed.
* @param mutation mutation to be applied at sink cluster.
* @throws IOException if something goes wrong.
*/
default void preReplicationSinkBatchMutate(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish we didn't have these protobuf types in the LimitedPrivate coprocessor API. We are going to have to live within compatibility constraints. However it would be prohibitively expensive to convert from protobuf types to HBase types (and back) just for coprocessors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @apurtell that exposing pb message in CP is not very good but I checked the code, we never convert the WALEntry to a none pb one so there is no good way for us to not use pb message here. Let's keep it like this for now.

Mutation mutation) throws IOException {

}

/**
* This will be called after replication sink mutations are executed on the sink table as part of
* batch call.
* @param ctx the environment to interact with the framework and region server.
* @param walEntry wal entry from which mutation is formed.
* @param mutation mutation to be applied at sink cluster.
* @throws IOException if something goes wrong.
*/
default void postReplicationSinkBatchMutate(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
Mutation mutation) throws IOException {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.SharedConnection;
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
Expand All @@ -40,6 +41,8 @@

import org.apache.hbase.thirdparty.com.google.protobuf.Service;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;

@InterfaceAudience.Private
public class RegionServerCoprocessorHost
extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
Expand Down Expand Up @@ -166,6 +169,26 @@ public void call(RegionServerObserver observer) throws IOException {
});
}

public void preReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.preReplicationSinkBatchMutate(this, walEntry, mutation);
}
});
}

public void postReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.postReplicationSinkBatchMutate(this, walEntry, mutation);
}
});
}

public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
throws IOException {
if (this.coprocEnvironments.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
Expand Down Expand Up @@ -72,7 +74,11 @@ public void initialize(Server server, FileSystem fs, Path logdir, Path oldLogDir

@Override
public void startReplicationService() throws IOException {
this.replicationSink = new ReplicationSink(this.conf);
RegionServerCoprocessorHost rsServerHost = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
}
this.replicationSink = new ReplicationSink(this.conf, rsServerHost);
this.server.getChoreService().scheduleChore(new ReplicationStatisticsChore(
"ReplicationSinkStatistics", server, (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -113,13 +114,17 @@ public class ReplicationSink {
private final int rowSizeWarnThreshold;
private boolean replicationSinkTrackerEnabled;

private final RegionServerCoprocessorHost rsServerHost;

/**
* Create a sink for replication
* @param conf conf object
* @throws IOException thrown when HDFS goes bad or bad file name
*/
public ReplicationSink(Configuration conf) throws IOException {
public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerHost)
throws IOException {
this.conf = HBaseConfiguration.create(conf);
this.rsServerHost = rsServerHost;
rowSizeWarnThreshold =
conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY,
Expand Down Expand Up @@ -178,6 +183,8 @@ private void decorateConf() {
/**
* Replicate this array of entries directly into the local cluster using the native client. Only
* operates against raw protobuf type saving on a conversion from pb to pojo.
* @param entries WAL entries to be replicated.
* @param cells cell scanner for iteration.
* @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory
* @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
Expand All @@ -200,6 +207,8 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();

Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
Pair<List<Mutation>, List<WALEntry>> mutationsToWalEntriesPairs =
new Pair<>(new ArrayList<>(), new ArrayList<>());
for (WALEntry entry : entries) {
TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
if (this.walEntrySinkFilter != null) {
Expand Down Expand Up @@ -265,6 +274,11 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
mutation.setClusterIds(clusterIds);
mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME,
HConstants.EMPTY_BYTE_ARRAY);
if (rsServerHost != null) {
rsServerHost.preReplicationSinkBatchMutate(entry, mutation);
mutationsToWalEntriesPairs.getFirst().add(mutation);
mutationsToWalEntriesPairs.getSecond().add(entry);
}
addToHashMultiMap(rowMap, table, clusterIds, mutation);
}
if (CellUtil.isDelete(cell)) {
Expand All @@ -287,6 +301,14 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
LOG.debug("Finished replicating mutations.");
}

if (rsServerHost != null) {
List<Mutation> mutations = mutationsToWalEntriesPairs.getFirst();
List<WALEntry> walEntries = mutationsToWalEntriesPairs.getSecond();
for (int i = 0; i < mutations.size(); i++) {
rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i));
}
}

if (bulkLoadsPerClusters != null) {
for (Entry<List<String>,
Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
Expand Down
Loading