-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce API v2 #494
Introduce API v2 #494
Conversation
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
kv/api_version.go
Outdated
return append(GetV2Prefix(mode), key...) | ||
} | ||
|
||
func DecodeV2StartKey(mode Mode, key []byte) []byte { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest to combine the following methods as DecodeV2KeyRange
. Then it would be easier to be used as startKey, endKey := DecodeV2KeyRange(startKey, endKey)
.
kv/api_version.go
Outdated
func DecodeV2StartKey(mode Mode, key []byte) []byte { | ||
minKey := GetV2Prefix(mode) | ||
if bytes.Compare(key, minKey) < 0 { | ||
return []byte{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest to return error, other than return an unexpected value silently.
rawkv/rawkv.go
Outdated
@@ -176,7 +198,7 @@ func (c *Client) Get(ctx context.Context, key []byte, options ...RawOption) ([]b | |||
req := tikvrpc.NewRequest( | |||
tikvrpc.CmdRawGet, | |||
&kvrpcpb.RawGetRequest{ | |||
Key: key, | |||
Key: c.buildRequestKey(key), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we will need to encode keys twice (one for request PD & another for request TiKV) ? How about do the encoding before here. And then it also seems that we don't need the CodecPDClientV2
.
rawkv/rawkv.go
Outdated
return key | ||
} | ||
|
||
func (c *Client) buildRequestEndKey(key []byte) []byte { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest to implement this method as buildRequestKeyRange
, as "end key" & empty string only make sense in a range.
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
internal/client/api_version.go
Outdated
) | ||
|
||
var ( | ||
ApiV2RawKeyPrefix = []byte("r") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to add prefix reserved for keyspace now.
internal/locate/region_cache.go
Outdated
@@ -483,6 +484,15 @@ func (c *RegionCache) SetPDClient(client pd.Client) { | |||
c.pdClient = client | |||
} | |||
|
|||
func (c *RegionCache) GetApiVersion() kvrpcpb.APIVersion { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not store api version when creating RegionCache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ApiVersion
should be removed from RegionCache
, and moved to RegionRequestSender
. The API version could be derived while constructing RegionRequestSender
.
@@ -756,7 +777,7 @@ func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte, ttls [ | |||
var batches []kvrpc.Batch | |||
// split the keys by size and RegionVerID | |||
for regionID, groupKeys := range groups { | |||
batches = kvrpc.AppendBatches(batches, regionID, groupKeys, keyToValue, keyTottl, rawBatchPutSize) | |||
batches = kvrpc.AppendBatches(batches, regionID, groupKeys, keyToValue, keyToTTL, rawBatchPutSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can combine keyToValue and keyToTTL to a single map to save memory and lookup
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
rawkv/rawkv.go
Outdated
@@ -148,6 +149,26 @@ func NewClient(ctx context.Context, pdAddrs []string, security config.Security, | |||
}, nil | |||
} | |||
|
|||
// NewClientV2 creates a client with PD cluster addrs with api version of v2. | |||
func NewClientV2(ctx context.Context, pdAddrs []string, security config.Security, opts ...pd.ClientOption) (*Client, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about support API v2 by options, and add a new constructor (e.g., NewClientOpt
) to accept the opts ...RawOption
?
The opts ...RawOption
on constructing Client
is required, when recently I'm trying to solve a proxy issue. I find that it's difficult to assign WithContextDialer
to underlying gRPC connections. Compared with pd/client
which has pd ClientOptions and a WithGRPCDialOptions method, , it's quite easy to do so.
Usage example of pd client with gRPC dialer:
dialProxy, err := proxy.SOCKS5("tcp", "127.0.0.1:1080", nil, proxy.Direct)
grpcOpt := grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
return dialProxy.(proxy.ContextDialer).DialContext(ctx, "tcp", address)
})
pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{
CAPath: security.ClusterSSLCA,
CertPath: security.ClusterSSLCert,
KeyPath: security.ClusterSSLKey,
}, pd.WithGRPCDialOptions(grpcOpt))
Signed-off-by: iosmanthus <[email protected]>
internal/client/client.go
Outdated
@@ -121,21 +121,22 @@ type connArray struct { | |||
done chan struct{} | |||
} | |||
|
|||
func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32, enableBatch bool, dialTimeout time.Duration) (*connArray, error) { | |||
func newConnArray(maxSize uint, addr string, security config.Security, | |||
idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, opts ...grpc.DialOption) (*connArray, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest to use opts []grpc.DialOption
instead, in case we will have more argument later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to me, since it's a private internal function.
rawkv/rawkv.go
Outdated
@@ -131,20 +166,32 @@ func (c *Client) SetColumnFamily(columnFamily string) *Client { | |||
} | |||
|
|||
// NewClient creates a client with PD cluster addrs. | |||
func NewClient(ctx context.Context, pdAddrs []string, security config.Security, opts ...pd.ClientOption) (*Client, error) { | |||
func NewClient(ctx context.Context, pdAddrs []string, opts ...ClientOpt) (*Client, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure whether it is a good idea to change method arguments and break compatibility. Maybe we can add a new method and implement NewClient
by the new one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about creating a new function called NewClientWithOpts
Signed-off-by: iosmanthus <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The NewClientWithOpts
LGTM~
@@ -91,6 +91,17 @@ type RPCRuntimeStats = locate.RPCRuntimeStats | |||
// CodecPDClient wraps a PD Client to decode the encoded keys in region meta. | |||
type CodecPDClient = locate.CodecPDClient | |||
|
|||
type CodecPDClientV2 = locate.CodecPDClientV2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need some comment.
rawkv/rawkv.go
Outdated
@@ -37,6 +37,7 @@ package rawkv | |||
import ( | |||
"bytes" | |||
"context" | |||
"google.golang.org/grpc" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not in good order
Signed-off-by: iosmanthus <[email protected]>
var err error | ||
var err error | ||
oldMeta := meta | ||
switch c.pdClient.(type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not move decodeRegionMetaKeyWithShallowCopy
into interface and use interface to dispatch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because c.pdClient is a pd.Client
interface already, if we change the interface, we need to modify the PD.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
internal/locate/region_request.go
Outdated
@@ -189,8 +190,18 @@ func RecordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats, | |||
|
|||
// NewRegionRequestSender creates a new sender. | |||
func NewRegionRequestSender(regionCache *RegionCache, client client.Client) *RegionRequestSender { | |||
var v kvrpcpb.APIVersion | |||
|
|||
switch regionCache.pdClient.(type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type assertion looks verbose, we can cache the version somewhere in RegionCache or simply have a method in client to return API version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could store the API version in the region cache, however, the type assertion is still needed while constructing the region cache. By this lifting, we could reduce type assertion in runtime, since the region cache construction happens only once for each client.
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
var err error | ||
var err error | ||
oldMeta := meta | ||
switch c.pdClient.(type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
Signed-off-by: iosmanthus <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Compatibility test will be fixed in a separate PR. |
Signed-off-by: iosmanthus [email protected]
Introducing API V2 for client-go, all the interfaces of RawKV have been upgraded, and their tests are migrated to detect the API version of the cluster.