Skip to content

Commit

Permalink
restapi truncate
Browse files Browse the repository at this point in the history
Signed-off-by: PowderLi <[email protected]>
  • Loading branch information
PowderLi committed Jan 6, 2024
1 parent 5be9099 commit 6ea8a76
Show file tree
Hide file tree
Showing 41 changed files with 19,991 additions and 5 deletions.
28 changes: 28 additions & 0 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"strconv"
"time"

"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

func (m *meta) updateCollectionIndex(index *model.Index) {
Expand Down Expand Up @@ -200,6 +202,10 @@ func (m *meta) CreateIndex(index *model.Index) error {
m.Lock()
defer m.Unlock()

if m.isCollectionLocked(index.CollectionID) {
return fmt.Errorf("collection: %d is locked", index.CollectionID)
}

if err := m.catalog.CreateIndex(m.ctx, index); err != nil {
log.Error("meta update: CreateIndex save meta fail", zap.Int64("collectionID", index.CollectionID),
zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID),
Expand Down Expand Up @@ -676,6 +682,9 @@ func (m *meta) GetDeletedIndexes() []*model.Index {
func (m *meta) RemoveIndex(collID, indexID UniqueID) error {
m.Lock()
defer m.Unlock()
if m.isCollectionLocked(collID) {
return fmt.Errorf("collection: %d is locked", collID)
}
log.Info("IndexCoord meta table remove index", zap.Int64("collectionID", collID), zap.Int64("indexID", indexID))
err := m.catalog.DropIndex(m.ctx, collID, indexID)
if err != nil {
Expand All @@ -696,6 +705,25 @@ func (m *meta) RemoveIndex(collID, indexID UniqueID) error {
return nil
}

func (m *meta) isCollectionLocked(collID typeutil.UniqueID) bool {
lockTime, exists := m.lockedCollIDs[collID]
if exists && lockTime.Before(time.Now().Add(-5*time.Minute)) {
delete(m.lockedCollIDs, collID)
return false
}
return exists
}

func (m *meta) LockCollection(collID typeutil.UniqueID) error {
m.lockedCollIDs[collID] = time.Now()
return nil
}

func (m *meta) UnlockCollection(collID typeutil.UniqueID) error {
delete(m.lockedCollIDs, collID)
return nil
}

func (m *meta) CleanSegmentIndex(buildID UniqueID) (bool, *model.SegmentIndex) {
m.RLock()
defer m.RUnlock()
Expand Down
12 changes: 12 additions & 0 deletions internal/datacoord/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,18 @@ func (s *Server) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (
return merr.Success(), nil
}

// DropIndex deletes indexes based on IndexName. One IndexName corresponds to the index of an entire column. A column is
// divided into many segments, and each segment corresponds to an IndexBuildID. DataCoord uses IndexBuildID to record
// index tasks.
func (s *Server) LockIndexes(ctx context.Context, req *indexpb.LockIndexesRequest) (*commonpb.Status, error) {
s.meta.LockCollection(req.CollectionID)
return merr.Status(nil), nil
}

func (s *Server) UnlockIndexes(ctx context.Context, req *indexpb.UnlockIndexesRequest) (*commonpb.Status, error) {
return merr.Status(nil), nil
}

// GetIndexInfos gets the index file paths for segment from DataCoord.
func (s *Server) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest) (*indexpb.GetIndexInfoResponse, error) {
log := log.Ctx(ctx).With(
Expand Down
3 changes: 3 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type meta struct {
channelCPs *typeutil.ConcurrentMap[string, *msgpb.MsgPosition] // vChannel -> channel checkpoint/see position
chunkManager storage.ChunkManager

lockedCollIDs map[typeutil.UniqueID]time.Time

// collectionIndexes records which indexes are on the collection
// collID -> indexID -> index
indexes map[UniqueID]map[UniqueID]*model.Index
Expand Down Expand Up @@ -94,6 +96,7 @@ func newMeta(ctx context.Context, catalog metastore.DataCoordCatalog, chunkManag
channelCPLocks: lock.NewKeyLock[string](),
channelCPs: typeutil.NewConcurrentMap[string, *msgpb.MsgPosition](),
chunkManager: chunkManager,
lockedCollIDs: make(map[UniqueID]time.Time),
indexes: make(map[UniqueID]map[UniqueID]*model.Index),
buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex),
}
Expand Down
12 changes: 12 additions & 0 deletions internal/datacoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,18 @@ func (m *mockRootCoordClient) DropCollection(ctx context.Context, req *milvuspb.
panic("not implemented") // TODO: Implement
}

func (m *mockRootCoordClient) LockCollection(ctx context.Context, req *rootcoordpb.LockCollectionRequest, opts ...grpc.CallOption) (*rootcoordpb.LockCollectionResponse, error) {
panic("not implemented") // TODO: Implement
}

func (m *mockRootCoordClient) UnlockCollection(ctx context.Context, req *rootcoordpb.UnlockCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}

func (m *mockRootCoordClient) SwitchCollection(ctx context.Context, req *rootcoordpb.SwitchCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}

func (m *mockRootCoordClient) HasCollection(ctx context.Context, req *milvuspb.HasCollectionRequest, opts ...grpc.CallOption) (*milvuspb.BoolResponse, error) {
panic("not implemented") // TODO: Implement
}
Expand Down
12 changes: 12 additions & 0 deletions internal/distributed/datacoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,18 @@ func (c *Client) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest, o
})
}

func (c *Client) LockIndexes(ctx context.Context, req *indexpb.LockIndexesRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.LockIndexes(ctx, req)
})
}

func (c *Client) UnlockIndexes(ctx context.Context, req *indexpb.UnlockIndexesRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.UnlockIndexes(ctx, req)
})
}

func (c *Client) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDataNodeTtMsgsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.ReportDataNodeTtMsgs(ctx, req)
Expand Down
10 changes: 10 additions & 0 deletions internal/distributed/datacoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,16 @@ func (s *Server) DropIndex(ctx context.Context, request *indexpb.DropIndexReques
return s.dataCoord.DropIndex(ctx, request)
}

// LockIndexes sends the drop index request to DataCoord.
func (s *Server) LockIndexes(ctx context.Context, request *indexpb.LockIndexesRequest) (*commonpb.Status, error) {
return s.LockIndexes(ctx, request)
}

// UnlockIndexes sends the drop index request to DataCoord.
func (s *Server) UnlockIndexes(ctx context.Context, request *indexpb.UnlockIndexesRequest) (*commonpb.Status, error) {
return s.UnlockIndexes(ctx, request)
}

// Deprecated: use DescribeIndex instead
func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest) (*indexpb.GetIndexBuildProgressResponse, error) {
return s.dataCoord.GetIndexBuildProgress(ctx, req)
Expand Down
1 change: 1 addition & 0 deletions internal/distributed/proxy/httpserver/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const (
VectorCollectionsPath = "/vector/collections"
VectorCollectionsCreatePath = "/vector/collections/create"
VectorCollectionsDescribePath = "/vector/collections/describe"
VectorCollectionsTruncatePath = "/vector/collections/truncate"
VectorCollectionsDropPath = "/vector/collections/drop"
VectorInsertPath = "/vector/insert"
VectorUpsertPath = "/vector/upsert"
Expand Down
4 changes: 4 additions & 0 deletions internal/distributed/proxy/httpserver/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func (m *mockProxyComponent) DropCollection(ctx context.Context, request *milvus
return testStatus, nil
}

func (m *mockProxyComponent) TruncateCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
return testStatus, nil
}

func (m *mockProxyComponent) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
return &milvuspb.BoolResponse{Status: testStatus}, nil
}
Expand Down
54 changes: 54 additions & 0 deletions internal/distributed/proxy/httpserver/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (h *HandlersV1) RegisterRoutesToV1(router gin.IRouter) {
router.GET(VectorCollectionsPath, h.listCollections)
router.POST(VectorCollectionsCreatePath, h.createCollection)
router.GET(VectorCollectionsDescribePath, h.getCollectionDetails)
router.POST(VectorCollectionsTruncatePath, h.truncateCollection)
router.POST(VectorCollectionsDropPath, h.dropCollection)
router.POST(VectorQueryPath, h.query)
router.POST(VectorGetPath, h.get)
Expand Down Expand Up @@ -946,3 +947,56 @@ func (h *HandlersV1) search(c *gin.Context) {
}
}
}

func (h *HandlersV1) truncateCollection(c *gin.Context) {
httpReq := DropCollectionReq{
DbName: DefaultDbName,
}
if err := c.ShouldBindWith(&httpReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of drop collection is incorrect", zap.Any("request", httpReq), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
return
}
if httpReq.CollectionName == "" {
log.Warn("high level restful api, drop collection require parameter: [collectionName], but miss")
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName]",
})
return
}
req := &milvuspb.DropCollectionRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
}
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
has, err := h.hasCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil {
return nil, RestRequestInterceptorErr
}
if !has {
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCollectionNotFound),
HTTPReturnMessage: merr.ErrCollectionNotFound.Error() + ", database: " + httpReq.DbName + ", collection: " + httpReq.CollectionName,
})
return nil, RestRequestInterceptorErr
}
return h.proxy.TruncateCollection(reqCtx, req.(*milvuspb.DropCollectionRequest))
})
if err == RestRequestInterceptorErr {
return
}
if err == nil {
err = merr.Error(response.(*commonpb.Status))
}
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}})
}
}
4 changes: 4 additions & 0 deletions internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,10 @@ func (s *Server) DropCollection(ctx context.Context, request *milvuspb.DropColle
return s.proxy.DropCollection(ctx, request)
}

func (s *Server) TruncateCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
return s.proxy.TruncateCollection(ctx, request)
}

// HasCollection notifies Proxy to check a collection's existence at specified timestamp
func (s *Server) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
return s.proxy.HasCollection(ctx, request)
Expand Down
35 changes: 35 additions & 0 deletions internal/distributed/rootcoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,41 @@ func (c *Client) DropCollection(ctx context.Context, in *milvuspb.DropCollection
})
}

// LockCollection lock collection
func (c *Client) LockCollection(ctx context.Context, in *rootcoordpb.LockCollectionRequest, opts ...grpc.CallOption) (*rootcoordpb.LockCollectionResponse, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*rootcoordpb.LockCollectionResponse, error) {
return client.LockCollection(ctx, in)
})
}

// UnlockCollection unlock collection
func (c *Client) UnlockCollection(ctx context.Context, in *rootcoordpb.UnlockCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
return client.UnlockCollection(ctx, in)
})
}

func (c *Client) SwitchCollection(ctx context.Context, req *rootcoordpb.SwitchCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
return client.SwitchCollection(ctx, req)
})
}

// HasCollection check collection existence
func (c *Client) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest, opts ...grpc.CallOption) (*milvuspb.BoolResponse, error) {
in = typeutil.Clone(in)
Expand Down
14 changes: 14 additions & 0 deletions internal/distributed/rootcoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,20 @@ func (s *Server) DropCollection(ctx context.Context, in *milvuspb.DropCollection
return s.rootCoord.DropCollection(ctx, in)
}

// LockCollection lock a collection
func (s *Server) LockCollection(ctx context.Context, in *rootcoordpb.LockCollectionRequest) (*rootcoordpb.LockCollectionResponse, error) {
return s.rootCoord.LockCollection(ctx, in)
}

// UnlockCollection unlock a collection
func (s *Server) UnlockCollection(ctx context.Context, in *rootcoordpb.UnlockCollectionRequest) (*commonpb.Status, error) {
return s.rootCoord.UnlockCollection(ctx, in)
}

func (s *Server) SwitchCollection(ctx context.Context, request *rootcoordpb.SwitchCollectionRequest) (*commonpb.Status, error) {
return s.rootCoord.SwitchCollection(ctx, request)
}

// HasCollection checks whether a collection is created
func (s *Server) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
return s.rootCoord.HasCollection(ctx, in)
Expand Down
2 changes: 2 additions & 0 deletions internal/metastore/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type RootCoordCatalog interface {
DropCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error
AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, alterType AlterType, ts typeutil.Timestamp) error

SaveMultiCollectionInfo(ctx context.Context, collections []*model.Collection, aliases []*model.Alias, ts typeutil.Timestamp) error

CreatePartition(ctx context.Context, dbID int64, partition *model.Partition, ts typeutil.Timestamp) error
DropPartition(ctx context.Context, dbID int64, collectionID typeutil.UniqueID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) error
AlterPartition(ctx context.Context, dbID int64, oldPart *model.Partition, newPart *model.Partition, alterType AlterType, ts typeutil.Timestamp) error
Expand Down
31 changes: 31 additions & 0 deletions internal/metastore/kv/rootcoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,37 @@ func (kc *Catalog) AlterCollection(ctx context.Context, oldColl *model.Collectio
return fmt.Errorf("altering collection doesn't support %s", alterType.String())
}

func (kc *Catalog) SaveMultiCollectionInfo(ctx context.Context, collections []*model.Collection, aliases []*model.Alias, ts typeutil.Timestamp) error {
if len(collections) == 0 {
return nil
}
kvMap := make(map[string]string, 0)
toRemoveKeys := make([]string, 0)
for _, alias := range aliases {
oldKBefore210 := BuildAliasKey210(alias.Name)
oldKeyWithoutDb := BuildAliasKey(alias.Name)
toRemoveKeys = append(toRemoveKeys, oldKBefore210, oldKeyWithoutDb)
k := BuildAliasKeyWithDB(alias.DbID, alias.Name)
aliasInfo := model.MarshalAliasModel(alias)
v, err := proto.Marshal(aliasInfo)
if err != nil {
return err
}
kvMap[k] = string(v)
}
for _, collection := range collections {
key := BuildCollectionKey(collection.DBID, collection.CollectionID)
collInfo := model.MarshalCollectionModel(collection)
value, err := proto.Marshal(collInfo)
if err != nil {
return fmt.Errorf("failed to marshal collection(%s) info: %s", collection.Name, err.Error())
}
kvMap[key] = string(value)
}

return kc.Snapshot.MultiSaveAndRemoveWithPrefix(kvMap, toRemoveKeys, ts)
}

func (kc *Catalog) alterModifyPartition(oldPart *model.Partition, newPart *model.Partition, ts typeutil.Timestamp) error {
if oldPart.CollectionID != newPart.CollectionID || oldPart.PartitionID != newPart.PartitionID {
return fmt.Errorf("altering collection id or partition id is forbidden")
Expand Down
6 changes: 6 additions & 0 deletions internal/metastore/mocks/mock_rootcoord_catalog.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions internal/mocks/mock_datacoord_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6ea8a76

Please sign in to comment.