-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HBASE-27529 Provide RS coproc ability to attach WAL extended attributes to mutations at replication sink #4924
Conversation
🎊 +1 overall
This message was automatically generated. |
f0a04cb
to
91a9af9
Compare
@@ -265,6 +266,11 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells, | |||
mutation.setClusterIds(clusterIds); | |||
mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, | |||
HConstants.EMPTY_BYTE_ARRAY); | |||
if (attributeList != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is WAL extended attribute, not mutation attribute? I do not think we should just set them as attributes here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There has to be a consumer of WAL extended attribute. The primary purpose for anything associated with WALKey is meant for deriving appropriate Mutation behavior or replication related behavior (like list of cluster ids). Hence if we allow coproc to set the WAL extended attributes, we should also let the attributes be consumed at sink side. Otherwise we won't be able to have any consumer of these attributes, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal of the WAL extended attribute is to pass some attributes down to the replication implementation from the coproc implementation, not to add it to the mutation's attribute. We should not mix things up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While WALKey can be used by coproc (IA.LP), but since coprocs reply on hbase cross-cluster replication, the sink side mutation attributes can be expected to be set up by HBase since hbase prepares mutations from WAL entries.
Only if WALKey extended attributes are set by coproc, hbase would attach the attributes to mutation. Does this sound good to you @Apache9 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal of the WAL extended attribute is to pass some attributes down to the replication implementation from the coproc implementation
That is correct but coproc would add it only so that it can consume it at sink side correct? Otherwise who else would read WAL attributes from WALEntry at sink side? HBase does it, while preparing for sink side mutations.
At sink side, once corpoc receives mutations generated by replication, it cannot correlate the mutation with WAL extended attributes on it's own. Getting these attributes consumed at sink side is the primary reason why corpoc would like to pass some attributes down to replication implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And if phoenix wants this feature, please describe more about at lease one usage. Let's see how to better support it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And also, you can read it in ReplicationSink at the sink cluster, no problem.
But once sink reads it, how else can it pass the attributes to coproc at sink side?
The problem here is you want to add the WAL attributes to mutation's attributes.
What if that's the behavior coproc expects? Would you be fine with configurable behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you read these attributes at the source cluster's coproc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And if phoenix wants this feature, please describe more about at lease one usage. Let's see how to better support it.
So let me take a simple example. Phoenix supports multi-tenancy by providing multi-tenant connection. Hence, same table can be shared by multiple tenants. This requires tenant id to be a rowkey prefix in hbase rowkey. When a particular tenant creates tenant connection and writes data, those data are not visible to other tenants.
Tenant id is very basic entity in phoenix. Now let's say we want to introduce some level of caching in phoenix server side, the cache would contain tenant id.
For HBase sink cluster, mutation is already created for specific table because table name is available in WALKey. For phoenix, let's say table name is available and mutation has full rowkey as well but it's very difficult to derive tenant id from this rowkey. Hence, phoenix coproc needs to know of tenant id attribute with mutation so that it can form tenant level connection or derive further attributes at sink side and so on.
In addition to tenant id, there are a few more imp metadata that is required at sink side coproc, without which it's not possible to be aware of how the data is meant to be used.
How do you read these attributes at the source cluster's coproc?
If we take same example as above, multi tenant connection is already created by client and hence tenant id is always available with all operations performed at source cluster coproc. Besides some of the imp attributes are already attached as mutation attributes at source side. But when the mutations are generated at sink side, while hbase metadata is available, coproc like phoenix metadata is not available and it would be great if we can attach these metadata attributes to sink mutation, otherwise sink coproc would not be able to derive them.
Making this configurable feature with proper documentation is also fine, if you don't mind. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the sake of this example, let's say that tenant id (part of the extended attribute in WALKey) is as important to phoenix, as is rowkey to hbase. It's quite basic entity.
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
Let's get back to this. For me, I do not think adding WAL attributes to mutation directly is the correct way to deal with this. Even introducing a flag seems weird, Maybe we could add coprocessor support or add new type of filter for ReplicationSink, so Phoenix could add the special logic by your own, i.e, adding the WAL attributes to mutation. Thanks. |
* attributes. This is useful for source cluster coproc to provide coproc specific metadata as WAL | ||
* annotations and have them attached back to Mutations generated from WAL entries at sink side. | ||
*/ | ||
public static final String HBASE_REPLICATION_SINK_ATTRIBUTES_WAL_TO_MUTATIONS = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't need to be configurable. Mutation attributes will be ignored if they are not expected so this is harmless to clients. And IMHO the right thing to be doing. I agree with your premise.
Normally WAL entries will not have attributes, so this is doubly harmless for standard use cases.
@@ -265,6 +273,11 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells, | |||
mutation.setClusterIds(clusterIds); | |||
mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, | |||
HConstants.EMPTY_BYTE_ARRAY); | |||
if (this.conf.getBoolean(HBASE_REPLICATION_SINK_ATTRIBUTES_WAL_TO_MUTATIONS, false)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not need to be conditional behavior.
For what it's worth, I think it would be fine, but I see your point. Consider what we are designing end to end. At the client on the source side, the client creates a mutation with an attribute containing some metadata. The design question before us is how to do that last part. I think introducing a default conversion from WAL entry attribute to mutation attribute would sufficient, but to @Apache9 's point, a symmetric flow here would have a pair of coprocessors involved: the first transforming attribute to metadata at the source client, the latter transforming metadata to attribute at the sink client. What I would not like to see here is a new config. It is quite use case specific and the accumulation of these makes problems due to configuration errors all the more likely. Optional behavior can be provided by a plug in implementation of WALentry to mutation transformation logic. So a coprocessor option indeed makes sense. |
This is exactly what the purpose of this patch was initially.
I agree here and that's why I kept the initial version without including any configs.
Yeah this also doesn't seem bad idea @Apache9. |
06f8c67
to
969ee38
Compare
.../test/java/org/apache/hadoop/hbase/replication/TestReplicationWithWALExtendedAttributes.java
Show resolved
Hide resolved
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
...e-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
Show resolved
Hide resolved
969ee38
to
2153de7
Compare
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
* @throws IOException if something goes wrong. | ||
*/ | ||
default void preReplicationSinkBatchMutate( | ||
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish we didn't have these protobuf types in the LimitedPrivate coprocessor API. We are going to have to live within compatibility constraints. However it would be prohibitively expensive to convert from protobuf types to HBase types (and back) just for coprocessors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @apurtell that exposing pb message in CP is not very good but I checked the code, we never convert the WALEntry to a none pb one so there is no good way for us to not use pb message here. Let's keep it like this for now.
* @throws IOException If failed to replicate the data | ||
*/ | ||
public void replicateEntries(List<WALEntry> entries, final CellScanner cells, | ||
String replicationClusterId, String sourceBaseNamespaceDirPath, | ||
String sourceHFileArchiveDirPath) throws IOException { | ||
String sourceHFileArchiveDirPath, RegionServerCoprocessorHost rsServerHost) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose the RegionServerCroprocessorHost should be initialized when creating ReplicationSink?
I checked the code, maybe in ReplicationSinkServiceImpl, we could check whether the Server is an instance of HRegionServer, if so, we get its RegionServerCoprocessorHost for creating the ReplicationSink. And we could also reuse the AsyncClusterConnection of the Server, instead of creating a new one here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, that's what I have added as part of this patch:
@Override
public void replicateLogEntries(List<AdminProtos.WALEntry> entries, CellScanner cells,
String replicationClusterId, String sourceBaseNamespaceDirPath,
String sourceHFileArchiveDirPath) throws IOException {
RegionServerCoprocessorHost rsServerHost = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
}
this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, rsServerHost);
}
So we do check for server instance to be regionserver, only then we pass non-null rs coproc host instance to this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean we should pass RegionServerCoprocessorHost in when creating ReplicationSink, and store it as the a class field, so we do not need to pass it everytime when calling replicateEntries...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, yes that sounds good, will make these changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is addressed. Thanks @Apache9!
* @throws IOException if something goes wrong. | ||
*/ | ||
default void preReplicationSinkBatchMutate( | ||
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @apurtell that exposing pb message in CP is not very good but I checked the code, we never convert the WALEntry to a none pb one so there is no good way for us to not use pb message here. Let's keep it like this for now.
2153de7
to
40d86db
Compare
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
…es to mutations at replication sink (#4924) Signed-off-by: Andrew Purtell <[email protected]>
…es to mutations at replication sink (#4924) Signed-off-by: Andrew Purtell <[email protected]>
…es to mutations at replication sink (#4924) Signed-off-by: Andrew Purtell <[email protected]>
Be lated +1. Thanks @virajjasani ! |
…es to mutations at replication sink (apache#4924) Signed-off-by: Andrew Purtell <[email protected]> (cherry picked from commit 6d2064d) Change-Id: I3aea43c4a2c61e01c9638d7a642f772fd0a6e039
HBase provides coproc ability to enhance WALKey attributes (a.k.a. WAL annotations) in order for the replication sink cluster to build required metadata with the mutations. The endpoint is preWALAppend(). This ability was provided by HBASE-22622. The map of extended attributes is optional and hence not directly used by hbase internally.
For any hbase downstreamers to build CDC (Change Data Capture) like functionality, it might required additional metadata in addition to the ones being used by hbase already (replication scope, list of cluster ids, seq id, table name, region id etc). For instance, Phoenix uses many additional attributes like tenant id, schema name, table type etc.
We already have this extended map of attributes available in WAL protobuf, to provide us the capability to (de)serialize it. While creating new ReplicateWALEntryRequest from the list of WAL entires, we are able to serialize the additional attributes. Similarly, at the replication sink side, the deserialized WALEntry has the extended attributes available.
At the sink cluster, we should be able to attach the deserialized extended attributes to the newly generated mutations so that the peer cluster can utilize the mutation attributes to re-build required metadata.