Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: refine querycoord meta/catalog related interfaces to ensure that each method includes a ctx parameter #37916

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions internal/metastore/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,23 +182,23 @@ type DataCoordCatalog interface {
}

type QueryCoordCatalog interface {
SaveCollection(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error
SavePartition(info ...*querypb.PartitionLoadInfo) error
SaveReplica(replicas ...*querypb.Replica) error
GetCollections() ([]*querypb.CollectionLoadInfo, error)
GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error)
GetReplicas() ([]*querypb.Replica, error)
ReleaseCollection(collection int64) error
ReleasePartition(collection int64, partitions ...int64) error
ReleaseReplicas(collectionID int64) error
ReleaseReplica(collection int64, replicas ...int64) error
SaveResourceGroup(rgs ...*querypb.ResourceGroup) error
RemoveResourceGroup(rgName string) error
GetResourceGroups() ([]*querypb.ResourceGroup, error)

SaveCollectionTargets(target ...*querypb.CollectionTarget) error
RemoveCollectionTarget(collectionID int64) error
GetCollectionTargets() (map[int64]*querypb.CollectionTarget, error)
SaveCollection(ctx context.Context, collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error
SavePartition(ctx context.Context, info ...*querypb.PartitionLoadInfo) error
SaveReplica(ctx context.Context, replicas ...*querypb.Replica) error
GetCollections(ctx context.Context) ([]*querypb.CollectionLoadInfo, error)
GetPartitions(ctx context.Context) (map[int64][]*querypb.PartitionLoadInfo, error)
GetReplicas(ctx context.Context) ([]*querypb.Replica, error)
ReleaseCollection(ctx context.Context, collection int64) error
ReleasePartition(ctx context.Context, collection int64, partitions ...int64) error
ReleaseReplicas(ctx context.Context, collectionID int64) error
ReleaseReplica(ctx context.Context, collection int64, replicas ...int64) error
SaveResourceGroup(ctx context.Context, rgs ...*querypb.ResourceGroup) error
RemoveResourceGroup(ctx context.Context, rgName string) error
GetResourceGroups(ctx context.Context) ([]*querypb.ResourceGroup, error)

SaveCollectionTargets(ctx context.Context, target ...*querypb.CollectionTarget) error
RemoveCollectionTarget(ctx context.Context, collectionID int64) error
GetCollectionTargets(ctx context.Context) (map[int64]*querypb.CollectionTarget, error)
}

// StreamingCoordCataLog is the interface for streamingcoord catalog
Expand Down
39 changes: 20 additions & 19 deletions internal/metastore/kv/querycoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package querycoord

import (
"bytes"
"context"
"fmt"
"io"

Expand Down Expand Up @@ -42,7 +43,7 @@ func NewCatalog(cli kv.MetaKv) Catalog {
}
}

func (s Catalog) SaveCollection(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error {
func (s Catalog) SaveCollection(ctx context.Context, collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error {
k := EncodeCollectionLoadInfoKey(collection.GetCollectionID())
v, err := proto.Marshal(collection)
if err != nil {
Expand All @@ -52,10 +53,10 @@ func (s Catalog) SaveCollection(collection *querypb.CollectionLoadInfo, partitio
if err != nil {
return err
}
return s.SavePartition(partitions...)
return s.SavePartition(ctx, partitions...)
}

func (s Catalog) SavePartition(info ...*querypb.PartitionLoadInfo) error {
func (s Catalog) SavePartition(ctx context.Context, info ...*querypb.PartitionLoadInfo) error {
for _, partition := range info {
k := EncodePartitionLoadInfoKey(partition.GetCollectionID(), partition.GetPartitionID())
v, err := proto.Marshal(partition)
Expand All @@ -70,7 +71,7 @@ func (s Catalog) SavePartition(info ...*querypb.PartitionLoadInfo) error {
return nil
}

func (s Catalog) SaveReplica(replicas ...*querypb.Replica) error {
func (s Catalog) SaveReplica(ctx context.Context, replicas ...*querypb.Replica) error {
kvs := make(map[string]string)
for _, replica := range replicas {
key := encodeReplicaKey(replica.GetCollectionID(), replica.GetID())
Expand All @@ -83,7 +84,7 @@ func (s Catalog) SaveReplica(replicas ...*querypb.Replica) error {
return s.cli.MultiSave(kvs)
}

func (s Catalog) SaveResourceGroup(rgs ...*querypb.ResourceGroup) error {
func (s Catalog) SaveResourceGroup(ctx context.Context, rgs ...*querypb.ResourceGroup) error {
ret := make(map[string]string)
for _, rg := range rgs {
key := encodeResourceGroupKey(rg.GetName())
Expand All @@ -98,12 +99,12 @@ func (s Catalog) SaveResourceGroup(rgs ...*querypb.ResourceGroup) error {
return s.cli.MultiSave(ret)
}

func (s Catalog) RemoveResourceGroup(rgName string) error {
func (s Catalog) RemoveResourceGroup(ctx context.Context, rgName string) error {
key := encodeResourceGroupKey(rgName)
return s.cli.Remove(key)
}

func (s Catalog) GetCollections() ([]*querypb.CollectionLoadInfo, error) {
func (s Catalog) GetCollections(ctx context.Context) ([]*querypb.CollectionLoadInfo, error) {
_, values, err := s.cli.LoadWithPrefix(CollectionLoadInfoPrefix)
if err != nil {
return nil, err
Expand All @@ -120,7 +121,7 @@ func (s Catalog) GetCollections() ([]*querypb.CollectionLoadInfo, error) {
return ret, nil
}

func (s Catalog) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error) {
func (s Catalog) GetPartitions(ctx context.Context) (map[int64][]*querypb.PartitionLoadInfo, error) {
_, values, err := s.cli.LoadWithPrefix(PartitionLoadInfoPrefix)
if err != nil {
return nil, err
Expand All @@ -137,7 +138,7 @@ func (s Catalog) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error)
return ret, nil
}

func (s Catalog) GetReplicas() ([]*querypb.Replica, error) {
func (s Catalog) GetReplicas(ctx context.Context) ([]*querypb.Replica, error) {
_, values, err := s.cli.LoadWithPrefix(ReplicaPrefix)
if err != nil {
return nil, err
Expand All @@ -151,7 +152,7 @@ func (s Catalog) GetReplicas() ([]*querypb.Replica, error) {
ret = append(ret, &info)
}

replicasV1, err := s.getReplicasFromV1()
replicasV1, err := s.getReplicasFromV1(ctx)
if err != nil {
return nil, err
}
Expand All @@ -160,7 +161,7 @@ func (s Catalog) GetReplicas() ([]*querypb.Replica, error) {
return ret, nil
}

func (s Catalog) getReplicasFromV1() ([]*querypb.Replica, error) {
func (s Catalog) getReplicasFromV1(ctx context.Context) ([]*querypb.Replica, error) {
_, replicaValues, err := s.cli.LoadWithPrefix(ReplicaMetaPrefixV1)
if err != nil {
return nil, err
Expand All @@ -183,7 +184,7 @@ func (s Catalog) getReplicasFromV1() ([]*querypb.Replica, error) {
return ret, nil
}

func (s Catalog) GetResourceGroups() ([]*querypb.ResourceGroup, error) {
func (s Catalog) GetResourceGroups(ctx context.Context) ([]*querypb.ResourceGroup, error) {
_, rgs, err := s.cli.LoadWithPrefix(ResourceGroupPrefix)
if err != nil {
return nil, err
Expand All @@ -202,7 +203,7 @@ func (s Catalog) GetResourceGroups() ([]*querypb.ResourceGroup, error) {
return ret, nil
}

func (s Catalog) ReleaseCollection(collection int64) error {
func (s Catalog) ReleaseCollection(ctx context.Context, collection int64) error {
// remove collection and obtained partitions
collectionKey := EncodeCollectionLoadInfoKey(collection)
err := s.cli.Remove(collectionKey)
Expand All @@ -213,7 +214,7 @@ func (s Catalog) ReleaseCollection(collection int64) error {
return s.cli.RemoveWithPrefix(partitionsPrefix)
}

func (s Catalog) ReleasePartition(collection int64, partitions ...int64) error {
func (s Catalog) ReleasePartition(ctx context.Context, collection int64, partitions ...int64) error {
keys := lo.Map(partitions, func(partition int64, _ int) string {
return EncodePartitionLoadInfoKey(collection, partition)
})
Expand All @@ -235,12 +236,12 @@ func (s Catalog) ReleasePartition(collection int64, partitions ...int64) error {
return s.cli.MultiRemove(keys)
}

func (s Catalog) ReleaseReplicas(collectionID int64) error {
func (s Catalog) ReleaseReplicas(ctx context.Context, collectionID int64) error {
key := encodeCollectionReplicaKey(collectionID)
return s.cli.RemoveWithPrefix(key)
}

func (s Catalog) ReleaseReplica(collection int64, replicas ...int64) error {
func (s Catalog) ReleaseReplica(ctx context.Context, collection int64, replicas ...int64) error {
keys := lo.Map(replicas, func(replica int64, _ int) string {
return encodeReplicaKey(collection, replica)
})
Expand All @@ -262,7 +263,7 @@ func (s Catalog) ReleaseReplica(collection int64, replicas ...int64) error {
return s.cli.MultiRemove(keys)
}

func (s Catalog) SaveCollectionTargets(targets ...*querypb.CollectionTarget) error {
func (s Catalog) SaveCollectionTargets(ctx context.Context, targets ...*querypb.CollectionTarget) error {
kvs := make(map[string]string)
for _, target := range targets {
k := encodeCollectionTargetKey(target.GetCollectionID())
Expand All @@ -283,12 +284,12 @@ func (s Catalog) SaveCollectionTargets(targets ...*querypb.CollectionTarget) err
return nil
}

func (s Catalog) RemoveCollectionTarget(collectionID int64) error {
func (s Catalog) RemoveCollectionTarget(ctx context.Context, collectionID int64) error {
k := encodeCollectionTargetKey(collectionID)
return s.cli.Remove(k)
}

func (s Catalog) GetCollectionTargets() (map[int64]*querypb.CollectionTarget, error) {
func (s Catalog) GetCollectionTargets(ctx context.Context) (map[int64]*querypb.CollectionTarget, error) {
keys, values, err := s.cli.LoadWithPrefix(CollectionTargetPrefix)
if err != nil {
return nil, err
Expand Down
Loading
Loading