Skip to content

Commit

Permalink
enhance: refine querycoord meta/catalog related interfaces to ensure …
Browse files Browse the repository at this point in the history
…that each method includes a ctx parameter (milvus-io#37916)

issue: milvus-io#35917 
This PR refine the querycoord meta related interfaces to ensure that
each method includes a ctx parameter.

Signed-off-by: tinswzy <[email protected]>
  • Loading branch information
tinswzy authored and JsDove committed Nov 26, 2024
1 parent 3d9c600 commit 3e87f68
Show file tree
Hide file tree
Showing 66 changed files with 2,667 additions and 2,353 deletions.
34 changes: 17 additions & 17 deletions internal/metastore/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,23 +182,23 @@ type DataCoordCatalog interface {
}

type QueryCoordCatalog interface {
SaveCollection(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error
SavePartition(info ...*querypb.PartitionLoadInfo) error
SaveReplica(replicas ...*querypb.Replica) error
GetCollections() ([]*querypb.CollectionLoadInfo, error)
GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error)
GetReplicas() ([]*querypb.Replica, error)
ReleaseCollection(collection int64) error
ReleasePartition(collection int64, partitions ...int64) error
ReleaseReplicas(collectionID int64) error
ReleaseReplica(collection int64, replicas ...int64) error
SaveResourceGroup(rgs ...*querypb.ResourceGroup) error
RemoveResourceGroup(rgName string) error
GetResourceGroups() ([]*querypb.ResourceGroup, error)

SaveCollectionTargets(target ...*querypb.CollectionTarget) error
RemoveCollectionTarget(collectionID int64) error
GetCollectionTargets() (map[int64]*querypb.CollectionTarget, error)
SaveCollection(ctx context.Context, collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error
SavePartition(ctx context.Context, info ...*querypb.PartitionLoadInfo) error
SaveReplica(ctx context.Context, replicas ...*querypb.Replica) error
GetCollections(ctx context.Context) ([]*querypb.CollectionLoadInfo, error)
GetPartitions(ctx context.Context) (map[int64][]*querypb.PartitionLoadInfo, error)
GetReplicas(ctx context.Context) ([]*querypb.Replica, error)
ReleaseCollection(ctx context.Context, collection int64) error
ReleasePartition(ctx context.Context, collection int64, partitions ...int64) error
ReleaseReplicas(ctx context.Context, collectionID int64) error
ReleaseReplica(ctx context.Context, collection int64, replicas ...int64) error
SaveResourceGroup(ctx context.Context, rgs ...*querypb.ResourceGroup) error
RemoveResourceGroup(ctx context.Context, rgName string) error
GetResourceGroups(ctx context.Context) ([]*querypb.ResourceGroup, error)

SaveCollectionTargets(ctx context.Context, target ...*querypb.CollectionTarget) error
RemoveCollectionTarget(ctx context.Context, collectionID int64) error
GetCollectionTargets(ctx context.Context) (map[int64]*querypb.CollectionTarget, error)
}

// StreamingCoordCataLog is the interface for streamingcoord catalog
Expand Down
39 changes: 20 additions & 19 deletions internal/metastore/kv/querycoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package querycoord

import (
"bytes"
"context"
"fmt"
"io"

Expand Down Expand Up @@ -42,7 +43,7 @@ func NewCatalog(cli kv.MetaKv) Catalog {
}
}

func (s Catalog) SaveCollection(collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error {
func (s Catalog) SaveCollection(ctx context.Context, collection *querypb.CollectionLoadInfo, partitions ...*querypb.PartitionLoadInfo) error {
k := EncodeCollectionLoadInfoKey(collection.GetCollectionID())
v, err := proto.Marshal(collection)
if err != nil {
Expand All @@ -52,10 +53,10 @@ func (s Catalog) SaveCollection(collection *querypb.CollectionLoadInfo, partitio
if err != nil {
return err
}
return s.SavePartition(partitions...)
return s.SavePartition(ctx, partitions...)
}

func (s Catalog) SavePartition(info ...*querypb.PartitionLoadInfo) error {
func (s Catalog) SavePartition(ctx context.Context, info ...*querypb.PartitionLoadInfo) error {
for _, partition := range info {
k := EncodePartitionLoadInfoKey(partition.GetCollectionID(), partition.GetPartitionID())
v, err := proto.Marshal(partition)
Expand All @@ -70,7 +71,7 @@ func (s Catalog) SavePartition(info ...*querypb.PartitionLoadInfo) error {
return nil
}

func (s Catalog) SaveReplica(replicas ...*querypb.Replica) error {
func (s Catalog) SaveReplica(ctx context.Context, replicas ...*querypb.Replica) error {
kvs := make(map[string]string)
for _, replica := range replicas {
key := encodeReplicaKey(replica.GetCollectionID(), replica.GetID())
Expand All @@ -83,7 +84,7 @@ func (s Catalog) SaveReplica(replicas ...*querypb.Replica) error {
return s.cli.MultiSave(kvs)
}

func (s Catalog) SaveResourceGroup(rgs ...*querypb.ResourceGroup) error {
func (s Catalog) SaveResourceGroup(ctx context.Context, rgs ...*querypb.ResourceGroup) error {
ret := make(map[string]string)
for _, rg := range rgs {
key := encodeResourceGroupKey(rg.GetName())
Expand All @@ -98,12 +99,12 @@ func (s Catalog) SaveResourceGroup(rgs ...*querypb.ResourceGroup) error {
return s.cli.MultiSave(ret)
}

func (s Catalog) RemoveResourceGroup(rgName string) error {
func (s Catalog) RemoveResourceGroup(ctx context.Context, rgName string) error {
key := encodeResourceGroupKey(rgName)
return s.cli.Remove(key)
}

func (s Catalog) GetCollections() ([]*querypb.CollectionLoadInfo, error) {
func (s Catalog) GetCollections(ctx context.Context) ([]*querypb.CollectionLoadInfo, error) {
_, values, err := s.cli.LoadWithPrefix(CollectionLoadInfoPrefix)
if err != nil {
return nil, err
Expand All @@ -120,7 +121,7 @@ func (s Catalog) GetCollections() ([]*querypb.CollectionLoadInfo, error) {
return ret, nil
}

func (s Catalog) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error) {
func (s Catalog) GetPartitions(ctx context.Context) (map[int64][]*querypb.PartitionLoadInfo, error) {
_, values, err := s.cli.LoadWithPrefix(PartitionLoadInfoPrefix)
if err != nil {
return nil, err
Expand All @@ -137,7 +138,7 @@ func (s Catalog) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error)
return ret, nil
}

func (s Catalog) GetReplicas() ([]*querypb.Replica, error) {
func (s Catalog) GetReplicas(ctx context.Context) ([]*querypb.Replica, error) {
_, values, err := s.cli.LoadWithPrefix(ReplicaPrefix)
if err != nil {
return nil, err
Expand All @@ -151,7 +152,7 @@ func (s Catalog) GetReplicas() ([]*querypb.Replica, error) {
ret = append(ret, &info)
}

replicasV1, err := s.getReplicasFromV1()
replicasV1, err := s.getReplicasFromV1(ctx)
if err != nil {
return nil, err
}
Expand All @@ -160,7 +161,7 @@ func (s Catalog) GetReplicas() ([]*querypb.Replica, error) {
return ret, nil
}

func (s Catalog) getReplicasFromV1() ([]*querypb.Replica, error) {
func (s Catalog) getReplicasFromV1(ctx context.Context) ([]*querypb.Replica, error) {
_, replicaValues, err := s.cli.LoadWithPrefix(ReplicaMetaPrefixV1)
if err != nil {
return nil, err
Expand All @@ -183,7 +184,7 @@ func (s Catalog) getReplicasFromV1() ([]*querypb.Replica, error) {
return ret, nil
}

func (s Catalog) GetResourceGroups() ([]*querypb.ResourceGroup, error) {
func (s Catalog) GetResourceGroups(ctx context.Context) ([]*querypb.ResourceGroup, error) {
_, rgs, err := s.cli.LoadWithPrefix(ResourceGroupPrefix)
if err != nil {
return nil, err
Expand All @@ -202,7 +203,7 @@ func (s Catalog) GetResourceGroups() ([]*querypb.ResourceGroup, error) {
return ret, nil
}

func (s Catalog) ReleaseCollection(collection int64) error {
func (s Catalog) ReleaseCollection(ctx context.Context, collection int64) error {
// remove collection and obtained partitions
collectionKey := EncodeCollectionLoadInfoKey(collection)
err := s.cli.Remove(collectionKey)
Expand All @@ -213,7 +214,7 @@ func (s Catalog) ReleaseCollection(collection int64) error {
return s.cli.RemoveWithPrefix(partitionsPrefix)
}

func (s Catalog) ReleasePartition(collection int64, partitions ...int64) error {
func (s Catalog) ReleasePartition(ctx context.Context, collection int64, partitions ...int64) error {
keys := lo.Map(partitions, func(partition int64, _ int) string {
return EncodePartitionLoadInfoKey(collection, partition)
})
Expand All @@ -235,12 +236,12 @@ func (s Catalog) ReleasePartition(collection int64, partitions ...int64) error {
return s.cli.MultiRemove(keys)
}

func (s Catalog) ReleaseReplicas(collectionID int64) error {
func (s Catalog) ReleaseReplicas(ctx context.Context, collectionID int64) error {
key := encodeCollectionReplicaKey(collectionID)
return s.cli.RemoveWithPrefix(key)
}

func (s Catalog) ReleaseReplica(collection int64, replicas ...int64) error {
func (s Catalog) ReleaseReplica(ctx context.Context, collection int64, replicas ...int64) error {
keys := lo.Map(replicas, func(replica int64, _ int) string {
return encodeReplicaKey(collection, replica)
})
Expand All @@ -262,7 +263,7 @@ func (s Catalog) ReleaseReplica(collection int64, replicas ...int64) error {
return s.cli.MultiRemove(keys)
}

func (s Catalog) SaveCollectionTargets(targets ...*querypb.CollectionTarget) error {
func (s Catalog) SaveCollectionTargets(ctx context.Context, targets ...*querypb.CollectionTarget) error {
kvs := make(map[string]string)
for _, target := range targets {
k := encodeCollectionTargetKey(target.GetCollectionID())
Expand All @@ -283,12 +284,12 @@ func (s Catalog) SaveCollectionTargets(targets ...*querypb.CollectionTarget) err
return nil
}

func (s Catalog) RemoveCollectionTarget(collectionID int64) error {
func (s Catalog) RemoveCollectionTarget(ctx context.Context, collectionID int64) error {
k := encodeCollectionTargetKey(collectionID)
return s.cli.Remove(k)
}

func (s Catalog) GetCollectionTargets() (map[int64]*querypb.CollectionTarget, error) {
func (s Catalog) GetCollectionTargets(ctx context.Context) (map[int64]*querypb.CollectionTarget, error) {
keys, values, err := s.cli.LoadWithPrefix(CollectionTargetPrefix)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 3e87f68

Please sign in to comment.