diff --git a/go.mod b/go.mod index a6cc1720ab1b7..b93efd8f497bd 100644 --- a/go.mod +++ b/go.mod @@ -23,8 +23,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.16.7 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231208092431-02cbad30332f - github.com/milvus-io/milvus/pkg v0.0.1 + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c github.com/minio/minio-go/v7 v7.0.61 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 @@ -62,6 +61,7 @@ require github.com/apache/arrow/go/v12 v12.0.1 require github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092 require ( + github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/quasilyte/go-ruleguard/dsl v0.3.22 golang.org/x/net v0.17.0 diff --git a/go.sum b/go.sum index 5c3cc3d9b8c06..47b801aa4b6fa 100644 --- a/go.sum +++ b/go.sum @@ -581,10 +581,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e h1:IH1WAXwEF8vbwahPdupi4zzRNWViT4B7fZzIjtRLpG4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231208092431-02cbad30332f h1:0cAMN9OsgBxlEUY8i1e1ocrBZ/cpu/Kdguz4JWz9fUc= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231208092431-02cbad30332f/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c h1:Wbc2IZt/13+B5jc8JPU/dOxGYy+1jeOsChVgcza+qgw= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092 h1:UYJ7JB+QlMOoFHNdd8mUa3/lV63t9dnBX7ILXmEEWPY= github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index f1fc55391ac3d..1a0f532576f3c 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -18,6 +18,7 @@ package datacoord import ( + "context" "fmt" "strconv" @@ -211,6 +212,22 @@ func (m *meta) CreateIndex(index *model.Index) error { return nil } +func (m *meta) AlterIndex(ctx context.Context, indexes ...*model.Index) error { + m.Lock() + defer m.Unlock() + + err := m.catalog.AlterIndexes(ctx, indexes) + if err != nil { + return err + } + + for _, index := range indexes { + m.updateCollectionIndex(index) + } + + return nil +} + // AddSegmentIndex adds the index meta corresponding the indexBuildID to meta table. func (m *meta) AddSegmentIndex(segIndex *model.SegmentIndex) error { m.Lock() diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 680aea7ce57a1..c5f3995ef5626 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -236,6 +237,51 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques return merr.Success(), nil } +func (s *Server) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest) (*commonpb.Status, error) { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", req.GetCollectionID()), + zap.String("indexName", req.GetIndexName()), + ) + log.Info("received AlterIndex request", zap.Any("params", req.GetParams())) + + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err)) + return merr.Status(err), nil + } + + indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName()) + params := make(map[string]string) + for _, index := range indexes { + for _, param := range index.UserIndexParams { + params[param.GetKey()] = param.GetValue() + } + + // update the index params + for _, param := range req.GetParams() { + params[param.GetKey()] = param.GetValue() + } + + log.Info("prepare to alter index", + zap.String("indexName", index.IndexName), + zap.Any("params", params), + ) + index.UserIndexParams = lo.MapToSlice(params, func(k string, v string) *commonpb.KeyValuePair { + return &commonpb.KeyValuePair{ + Key: k, + Value: v, + } + }) + } + + err := s.meta.AlterIndex(ctx, indexes...) + if err != nil { + log.Warn("failed to alter index", zap.Error(err)) + return merr.Status(err), nil + } + + return merr.Success(), nil +} + // GetIndexState gets the index state of the index name in the request from Proxy. // Deprecated func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) { diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index c4cb500ab8712..8eac0e71f8e0d 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -237,6 +237,362 @@ func TestServer_CreateIndex(t *testing.T) { }) } +func TestServer_AlterIndex(t *testing.T) { + var ( + collID = UniqueID(1) + partID = UniqueID(2) + fieldID = UniqueID(10) + indexID = UniqueID(100) + segID = UniqueID(1000) + invalidSegID = UniqueID(1001) + buildID = UniqueID(10000) + indexName = "default_idx" + typeParams = []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + } + indexParams = []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: "IVF_FLAT", + }, + } + createTS = uint64(1000) + ctx = context.Background() + req = &indexpb.AlterIndexRequest{ + CollectionID: collID, + IndexName: "default_idx", + Params: []*commonpb.KeyValuePair{{ + Key: common.MmapEnabledKey, + Value: "true", + }}, + } + ) + + catalog := catalogmocks.NewDataCoordCatalog(t) + catalog.On("AlterIndexes", + mock.Anything, + mock.Anything, + ).Return(nil) + + s := &Server{ + meta: &meta{ + catalog: catalog, + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collID: { + // finished + indexID: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID, + IndexID: indexID, + IndexName: indexName, + IsDeleted: false, + CreateTime: createTS, + TypeParams: typeParams, + IndexParams: indexParams, + IsAutoIndex: false, + UserIndexParams: nil, + }, + // deleted + indexID + 1: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID + 1, + IndexID: indexID + 1, + IndexName: indexName + "_1", + IsDeleted: true, + CreateTime: createTS, + TypeParams: typeParams, + IndexParams: indexParams, + IsAutoIndex: false, + UserIndexParams: nil, + }, + // unissued + indexID + 2: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID + 2, + IndexID: indexID + 2, + IndexName: indexName + "_2", + IsDeleted: false, + CreateTime: createTS, + TypeParams: typeParams, + IndexParams: indexParams, + IsAutoIndex: false, + UserIndexParams: nil, + }, + // inProgress + indexID + 3: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID + 3, + IndexID: indexID + 3, + IndexName: indexName + "_3", + IsDeleted: false, + CreateTime: createTS, + TypeParams: typeParams, + IndexParams: indexParams, + IsAutoIndex: false, + UserIndexParams: nil, + }, + // failed + indexID + 4: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID + 4, + IndexID: indexID + 4, + IndexName: indexName + "_4", + IsDeleted: false, + CreateTime: createTS, + TypeParams: typeParams, + IndexParams: indexParams, + IsAutoIndex: false, + UserIndexParams: nil, + }, + // unissued + indexID + 5: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID + 5, + IndexID: indexID + 5, + IndexName: indexName + "_5", + IsDeleted: false, + CreateTime: createTS, + TypeParams: typeParams, + IndexParams: indexParams, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{ + invalidSegID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: invalidSegID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + // timesamp > index start time, will be filtered out + Timestamp: createTS + 1, + }, + }, + }, + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, + }, + CreatedByCompaction: true, + CompactionFrom: []int64{segID - 1}, + }, + segmentIndexes: map[UniqueID]*model.SegmentIndex{ + indexID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID, + BuildID: buildID, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: createTS, + IndexFileKeys: nil, + IndexSize: 0, + WriteHandoff: false, + }, + indexID + 1: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 1, + BuildID: buildID + 1, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: createTS, + IndexFileKeys: nil, + IndexSize: 0, + WriteHandoff: false, + }, + indexID + 3: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 3, + BuildID: buildID + 3, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_InProgress, + FailReason: "", + IsDeleted: false, + CreateTime: createTS, + IndexFileKeys: nil, + IndexSize: 0, + WriteHandoff: false, + }, + indexID + 4: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 4, + BuildID: buildID + 4, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Failed, + FailReason: "mock failed", + IsDeleted: false, + CreateTime: createTS, + IndexFileKeys: nil, + IndexSize: 0, + WriteHandoff: false, + }, + indexID + 5: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 5, + BuildID: buildID + 5, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: createTS, + IndexFileKeys: nil, + IndexSize: 0, + WriteHandoff: false, + }, + }, + }, + segID - 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, + }, + }, + segmentIndexes: map[UniqueID]*model.SegmentIndex{ + indexID: { + SegmentID: segID - 1, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID, + BuildID: buildID, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + CreateTime: createTS, + }, + indexID + 1: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 1, + BuildID: buildID + 1, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + CreateTime: createTS, + }, + indexID + 3: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 3, + BuildID: buildID + 3, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_InProgress, + CreateTime: createTS, + }, + indexID + 4: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 4, + BuildID: buildID + 4, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Failed, + FailReason: "mock failed", + CreateTime: createTS, + }, + indexID + 5: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: 10000, + IndexID: indexID + 5, + BuildID: buildID + 5, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + CreateTime: createTS, + }, + }, + }, + }}, + }, + allocator: newMockAllocator(), + notifyIndexChan: make(chan UniqueID, 1), + } + + t.Run("server not available", func(t *testing.T) { + s.stateCode.Store(commonpb.StateCode_Initializing) + resp, err := s.AlterIndex(ctx, req) + assert.NoError(t, err) + assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady) + }) + + s.stateCode.Store(commonpb.StateCode_Healthy) + + t.Run("success", func(t *testing.T) { + resp, err := s.AlterIndex(ctx, req) + assert.NoError(t, merr.CheckRPCCall(resp, err)) + + describeResp, err := s.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{ + CollectionID: collID, + IndexName: "default_idx", + Timestamp: createTS, + }) + assert.NoError(t, merr.CheckRPCCall(describeResp, err)) + assert.True(t, common.IsMmapEnabled(describeResp.IndexInfos[0].GetUserIndexParams()...), "indexInfo: %+v", describeResp.IndexInfos[0]) + }) +} + func TestServer_GetIndexState(t *testing.T) { var ( collID = UniqueID(1) diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index c52fd3b8232a9..925cb59b75740 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -564,6 +564,13 @@ func (c *Client) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques }) } +// AlterIndex sends the alter index request to IndexCoord. +func (c *Client) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) { + return client.AlterIndex(ctx, req) + }) +} + // GetIndexState gets the index states from IndexCoord. func (c *Client) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest, opts ...grpc.CallOption) (*indexpb.GetIndexStateResponse, error) { return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStateResponse, error) { diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index fc8d21ecd094f..f5ff6de90c5e1 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -436,6 +436,10 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques return s.dataCoord.CreateIndex(ctx, req) } +func (s *Server) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest) (*commonpb.Status, error) { + return s.dataCoord.AlterIndex(ctx, req) +} + // GetIndexState gets the index states from DataCoord. // Deprecated: use DescribeIndex instead func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) { diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index ce94260eba397..766dae2cddc39 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -821,9 +821,9 @@ func (s *Server) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexR return s.proxy.CreateIndex(ctx, request) } +// AlterIndex notifies Proxy to alter index func (s *Server) AlterIndex(ctx context.Context, request *milvuspb.AlterIndexRequest) (*commonpb.Status, error) { - // Todo - return nil, nil + return s.proxy.AlterIndex(ctx, request) } // DropIndex notifies Proxy to drop index @@ -871,8 +871,7 @@ func (s *Server) Search(ctx context.Context, request *milvuspb.SearchRequest) (* } func (s *Server) SearchV2(ctx context.Context, request *milvuspb.SearchRequestV2) (*milvuspb.SearchResults, error) { - // Todo - return nil, nil + return s.proxy.SearchV2(ctx, request) } func (s *Server) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) { diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index ee028a83e6882..8adf2d7a0390b 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -327,6 +327,11 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error { for _, kvPair := range it.req.GetIndexParams() { key, value := kvPair.GetKey(), kvPair.GetValue() + // knowhere would report error if encountered the unknown key, + // so skip this + if key == common.MmapEnabledKey { + continue + } indexParams[key] = value } it.newTypeParams = typeParams diff --git a/internal/mocks/mock_datacoord.go b/internal/mocks/mock_datacoord.go index abd562a0771bf..dca83cf5142ea 100644 --- a/internal/mocks/mock_datacoord.go +++ b/internal/mocks/mock_datacoord.go @@ -36,6 +36,61 @@ func (_m *MockDataCoord) EXPECT() *MockDataCoord_Expecter { return &MockDataCoord_Expecter{mock: &_m.Mock} } +// AlterIndex provides a mock function with given fields: _a0, _a1 +func (_m *MockDataCoord) AlterIndex(_a0 context.Context, _a1 *indexpb.AlterIndexRequest) (*commonpb.Status, error) { + ret := _m.Called(_a0, _a1) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *indexpb.AlterIndexRequest) (*commonpb.Status, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *indexpb.AlterIndexRequest) *commonpb.Status); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *indexpb.AlterIndexRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataCoord_AlterIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterIndex' +type MockDataCoord_AlterIndex_Call struct { + *mock.Call +} + +// AlterIndex is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *indexpb.AlterIndexRequest +func (_e *MockDataCoord_Expecter) AlterIndex(_a0 interface{}, _a1 interface{}) *MockDataCoord_AlterIndex_Call { + return &MockDataCoord_AlterIndex_Call{Call: _e.mock.On("AlterIndex", _a0, _a1)} +} + +func (_c *MockDataCoord_AlterIndex_Call) Run(run func(_a0 context.Context, _a1 *indexpb.AlterIndexRequest)) *MockDataCoord_AlterIndex_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*indexpb.AlterIndexRequest)) + }) + return _c +} + +func (_c *MockDataCoord_AlterIndex_Call) Return(_a0 *commonpb.Status, _a1 error) *MockDataCoord_AlterIndex_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataCoord_AlterIndex_Call) RunAndReturn(run func(context.Context, *indexpb.AlterIndexRequest) (*commonpb.Status, error)) *MockDataCoord_AlterIndex_Call { + _c.Call.Return(run) + return _c +} + // AssignSegmentID provides a mock function with given fields: _a0, _a1 func (_m *MockDataCoord) AssignSegmentID(_a0 context.Context, _a1 *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_datacoord_client.go b/internal/mocks/mock_datacoord_client.go index 8913a61e53f07..8c37d5ba5edad 100644 --- a/internal/mocks/mock_datacoord_client.go +++ b/internal/mocks/mock_datacoord_client.go @@ -33,6 +33,76 @@ func (_m *MockDataCoordClient) EXPECT() *MockDataCoordClient_Expecter { return &MockDataCoordClient_Expecter{mock: &_m.Mock} } +// AlterIndex provides a mock function with given fields: ctx, in, opts +func (_m *MockDataCoordClient) AlterIndex(ctx context.Context, in *indexpb.AlterIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *indexpb.AlterIndexRequest, ...grpc.CallOption) (*commonpb.Status, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *indexpb.AlterIndexRequest, ...grpc.CallOption) *commonpb.Status); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *indexpb.AlterIndexRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataCoordClient_AlterIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterIndex' +type MockDataCoordClient_AlterIndex_Call struct { + *mock.Call +} + +// AlterIndex is a helper method to define mock.On call +// - ctx context.Context +// - in *indexpb.AlterIndexRequest +// - opts ...grpc.CallOption +func (_e *MockDataCoordClient_Expecter) AlterIndex(ctx interface{}, in interface{}, opts ...interface{}) *MockDataCoordClient_AlterIndex_Call { + return &MockDataCoordClient_AlterIndex_Call{Call: _e.mock.On("AlterIndex", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockDataCoordClient_AlterIndex_Call) Run(run func(ctx context.Context, in *indexpb.AlterIndexRequest, opts ...grpc.CallOption)) *MockDataCoordClient_AlterIndex_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*indexpb.AlterIndexRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockDataCoordClient_AlterIndex_Call) Return(_a0 *commonpb.Status, _a1 error) *MockDataCoordClient_AlterIndex_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataCoordClient_AlterIndex_Call) RunAndReturn(run func(context.Context, *indexpb.AlterIndexRequest, ...grpc.CallOption) (*commonpb.Status, error)) *MockDataCoordClient_AlterIndex_Call { + _c.Call.Return(run) + return _c +} + // AssignSegmentID provides a mock function with given fields: ctx, in, opts func (_m *MockDataCoordClient) AssignSegmentID(ctx context.Context, in *datapb.AssignSegmentIDRequest, opts ...grpc.CallOption) (*datapb.AssignSegmentIDResponse, error) { _va := make([]interface{}, len(opts)) diff --git a/internal/mocks/mock_proxy.go b/internal/mocks/mock_proxy.go index ce1e9e1353f36..1186f90632e8c 100644 --- a/internal/mocks/mock_proxy.go +++ b/internal/mocks/mock_proxy.go @@ -199,6 +199,61 @@ func (_c *MockProxy_AlterCollection_Call) RunAndReturn(run func(context.Context, return _c } +// AlterIndex provides a mock function with given fields: _a0, _a1 +func (_m *MockProxy) AlterIndex(_a0 context.Context, _a1 *milvuspb.AlterIndexRequest) (*commonpb.Status, error) { + ret := _m.Called(_a0, _a1) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterIndexRequest) (*commonpb.Status, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterIndexRequest) *commonpb.Status); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterIndexRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockProxy_AlterIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterIndex' +type MockProxy_AlterIndex_Call struct { + *mock.Call +} + +// AlterIndex is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.AlterIndexRequest +func (_e *MockProxy_Expecter) AlterIndex(_a0 interface{}, _a1 interface{}) *MockProxy_AlterIndex_Call { + return &MockProxy_AlterIndex_Call{Call: _e.mock.On("AlterIndex", _a0, _a1)} +} + +func (_c *MockProxy_AlterIndex_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.AlterIndexRequest)) *MockProxy_AlterIndex_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.AlterIndexRequest)) + }) + return _c +} + +func (_c *MockProxy_AlterIndex_Call) Return(_a0 *commonpb.Status, _a1 error) *MockProxy_AlterIndex_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockProxy_AlterIndex_Call) RunAndReturn(run func(context.Context, *milvuspb.AlterIndexRequest) (*commonpb.Status, error)) *MockProxy_AlterIndex_Call { + _c.Call.Return(run) + return _c +} + // CalcDistance provides a mock function with given fields: _a0, _a1 func (_m *MockProxy) CalcDistance(_a0 context.Context, _a1 *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) { ret := _m.Called(_a0, _a1) @@ -639,11 +694,6 @@ func (_c *MockProxy_CreateIndex_Call) RunAndReturn(run func(context.Context, *mi return _c } -func (_m *MockProxy) AlterIndex(_a0 context.Context, _a1 *milvuspb.AlterIndexRequest) (*commonpb.Status, error) { - // Todo - return nil, nil -} - // CreatePartition provides a mock function with given fields: _a0, _a1 func (_m *MockProxy) CreatePartition(_a0 context.Context, _a1 *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) @@ -4560,9 +4610,59 @@ func (_c *MockProxy_Search_Call) RunAndReturn(run func(context.Context, *milvusp return _c } +// SearchV2 provides a mock function with given fields: _a0, _a1 func (_m *MockProxy) SearchV2(_a0 context.Context, _a1 *milvuspb.SearchRequestV2) (*milvuspb.SearchResults, error) { - // Todo - return nil, nil + ret := _m.Called(_a0, _a1) + + var r0 *milvuspb.SearchResults + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.SearchRequestV2) (*milvuspb.SearchResults, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.SearchRequestV2) *milvuspb.SearchResults); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.SearchResults) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.SearchRequestV2) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockProxy_SearchV2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SearchV2' +type MockProxy_SearchV2_Call struct { + *mock.Call +} + +// SearchV2 is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.SearchRequestV2 +func (_e *MockProxy_Expecter) SearchV2(_a0 interface{}, _a1 interface{}) *MockProxy_SearchV2_Call { + return &MockProxy_SearchV2_Call{Call: _e.mock.On("SearchV2", _a0, _a1)} +} + +func (_c *MockProxy_SearchV2_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.SearchRequestV2)) *MockProxy_SearchV2_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.SearchRequestV2)) + }) + return _c +} + +func (_c *MockProxy_SearchV2_Call) Return(_a0 *milvuspb.SearchResults, _a1 error) *MockProxy_SearchV2_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockProxy_SearchV2_Call) RunAndReturn(run func(context.Context, *milvuspb.SearchRequestV2) (*milvuspb.SearchResults, error)) *MockProxy_SearchV2_Call { + _c.Call.Return(run) + return _c } // SelectGrant provides a mock function with given fields: _a0, _a1 diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index c896d32f2e6e5..17e43615beafe 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -78,6 +78,7 @@ service DataCoord { rpc CheckHealth(milvus.CheckHealthRequest) returns (milvus.CheckHealthResponse) {} rpc CreateIndex(index.CreateIndexRequest) returns (common.Status){} + rpc AlterIndex(index.AlterIndexRequest) returns (common.Status){} // Deprecated: use DescribeIndex instead rpc GetIndexState(index.GetIndexStateRequest) returns (index.GetIndexStateResponse) {} rpc GetSegmentIndexState(index.GetSegmentIndexStateRequest) returns (index.GetSegmentIndexStateResponse) {} diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index 6b748784f5502..0d0fc1348dd00 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -10,33 +10,19 @@ import "milvus.proto"; import "schema.proto"; service IndexCoord { - rpc GetComponentStates(milvus.GetComponentStatesRequest) - returns (milvus.ComponentStates) { - } - rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) - returns (milvus.StringResponse) { - } - rpc CreateIndex(CreateIndexRequest) returns (common.Status) { - } - // Deprecated: use DescribeIndex instead - rpc GetIndexState(GetIndexStateRequest) returns (GetIndexStateResponse) { - } - rpc GetSegmentIndexState(GetSegmentIndexStateRequest) - returns (GetSegmentIndexStateResponse) { - } - rpc GetIndexInfos(GetIndexInfoRequest) returns (GetIndexInfoResponse) { - } - rpc DropIndex(DropIndexRequest) returns (common.Status) { - } - rpc DescribeIndex(DescribeIndexRequest) returns (DescribeIndexResponse) { - } - rpc GetIndexStatistics(GetIndexStatisticsRequest) - returns (GetIndexStatisticsResponse) { - } - // Deprecated: use DescribeIndex instead - rpc GetIndexBuildProgress(GetIndexBuildProgressRequest) - returns (GetIndexBuildProgressResponse) { - } + rpc GetComponentStates(milvus.GetComponentStatesRequest) returns (milvus.ComponentStates) {} + rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) returns(milvus.StringResponse){} + rpc CreateIndex(CreateIndexRequest) returns (common.Status){} + rpc AlterIndex(AlterIndexRequest) returns (common.Status){} + // Deprecated: use DescribeIndex instead + rpc GetIndexState(GetIndexStateRequest) returns (GetIndexStateResponse) {} + rpc GetSegmentIndexState(GetSegmentIndexStateRequest) returns (GetSegmentIndexStateResponse) {} + rpc GetIndexInfos(GetIndexInfoRequest) returns (GetIndexInfoResponse){} + rpc DropIndex(DropIndexRequest) returns (common.Status) {} + rpc DescribeIndex(DescribeIndexRequest) returns (DescribeIndexResponse) {} + rpc GetIndexStatistics(GetIndexStatisticsRequest) returns (GetIndexStatisticsResponse) {} + // Deprecated: use DescribeIndex instead + rpc GetIndexBuildProgress(GetIndexBuildProgressRequest) returns (GetIndexBuildProgressResponse) {} rpc ShowConfigurations(internal.ShowConfigurationsRequest) returns (internal.ShowConfigurationsResponse) { @@ -171,6 +157,12 @@ message CreateIndexRequest { repeated common.KeyValuePair user_index_params = 8; } +message AlterIndexRequest { + int64 collectionID = 1; + string index_name = 2; + repeated common.KeyValuePair params = 3; +} + message GetIndexInfoRequest { int64 collectionID = 1; repeated int64 segmentIDs = 2; diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 907441bbabb3c..f02118148a203 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1815,8 +1815,74 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde } func (node *Proxy) AlterIndex(ctx context.Context, request *milvuspb.AlterIndexRequest) (*commonpb.Status, error) { - // Todo - return nil, nil + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return merr.Status(err), nil + } + + ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-AlterIndex") + defer sp.End() + + task := &alterIndexTask{ + ctx: ctx, + Condition: NewTaskCondition(ctx), + req: request, + datacoord: node.dataCoord, + querycoord: node.queryCoord, + replicateMsgStream: node.replicateMsgStream, + } + + method := "AlterIndex" + tr := timerecord.NewTimeRecorder(method) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() + + log := log.Ctx(ctx).With( + zap.String("role", typeutil.ProxyRole), + zap.String("db", request.DbName), + zap.String("collection", request.CollectionName), + zap.String("indexName", request.GetIndexName()), + zap.Any("extraParams", request.ExtraParams)) + + log.Info(rpcReceived(method)) + + if err := node.sched.ddQueue.Enqueue(task); err != nil { + log.Warn( + rpcFailedToEnqueue(method), + zap.Error(err)) + + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, + metrics.AbandonLabel).Inc() + + return merr.Status(err), nil + } + + log.Info( + rpcEnqueued(method), + zap.Uint64("BeginTs", task.BeginTs()), + zap.Uint64("EndTs", task.EndTs())) + + if err := task.WaitToFinish(); err != nil { + log.Warn( + rpcFailedToWaitToFinish(method), + zap.Error(err), + zap.Uint64("BeginTs", task.BeginTs()), + zap.Uint64("EndTs", task.EndTs())) + + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, + metrics.FailLabel).Inc() + + return merr.Status(err), nil + } + + log.Info( + rpcDone(method), + zap.Uint64("BeginTs", task.BeginTs()), + zap.Uint64("EndTs", task.EndTs())) + + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, + metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + return task.result, nil } // DescribeIndex get the meta information of index, such as index state, index id and etc. @@ -2655,8 +2721,9 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) } func (node *Proxy) SearchV2(ctx context.Context, request *milvuspb.SearchRequestV2) (*milvuspb.SearchResults, error) { - // Todo - return nil, nil + return &milvuspb.SearchResults{ + Status: merr.Status(merr.WrapErrServiceInternal("unimplemented")), + }, nil } func (node *Proxy) getVectorPlaceholderGroupForSearchByPks(ctx context.Context, request *milvuspb.SearchRequest) ([]byte, error) { diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 1df1c148c44a8..8feb1f3e7495a 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -1107,6 +1107,26 @@ func TestProxy(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + wg.Add(1) + t.Run("alter_index", func(t *testing.T) { + defer wg.Done() + req := &milvuspb.AlterIndexRequest{ + DbName: dbName, + CollectionName: collectionName, + IndexName: indexName, + ExtraParams: []*commonpb.KeyValuePair{ + { + Key: common.MmapEnabledKey, + Value: "true", + }, + }, + } + + resp, err := proxy.AlterIndex(ctx, req) + err = merr.CheckRPCCall(resp, err) + assert.NoError(t, err) + }) + wg.Add(1) t.Run("describe index", func(t *testing.T) { defer wg.Done() @@ -1117,9 +1137,26 @@ func TestProxy(t *testing.T) { FieldName: floatVecField, IndexName: "", }) + err = merr.CheckRPCCall(resp, err) + assert.NoError(t, err) + assert.Equal(t, indexName, resp.IndexDescriptions[0].IndexName) + assert.True(t, common.IsMmapEnabled(resp.IndexDescriptions[0].GetParams()...), "params: %+v", resp.IndexDescriptions[0]) + + // disable mmap then the tests below could continue + req := &milvuspb.AlterIndexRequest{ + DbName: dbName, + CollectionName: collectionName, + IndexName: indexName, + ExtraParams: []*commonpb.KeyValuePair{ + { + Key: common.MmapEnabledKey, + Value: "false", + }, + }, + } + status, err := proxy.AlterIndex(ctx, req) + err = merr.CheckRPCCall(status, err) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - indexName = resp.IndexDescriptions[0].IndexName }) wg.Add(1) diff --git a/internal/proxy/task_index.go b/internal/proxy/task_index.go index f07087f450891..1ae5f1122c21e 100644 --- a/internal/proxy/task_index.go +++ b/internal/proxy/task_index.go @@ -42,6 +42,7 @@ import ( const ( CreateIndexTaskName = "CreateIndexTask" + AlterIndexTaskName = "AlterIndexTask" DescribeIndexTaskName = "DescribeIndexTask" DropIndexTaskName = "DropIndexTask" GetIndexStateTaskName = "GetIndexStateTask" @@ -423,6 +424,121 @@ func (cit *createIndexTask) PostExecute(ctx context.Context) error { return nil } +type alterIndexTask struct { + Condition + req *milvuspb.AlterIndexRequest + ctx context.Context + datacoord types.DataCoordClient + querycoord types.QueryCoordClient + result *commonpb.Status + + replicateMsgStream msgstream.MsgStream + + collectionID UniqueID +} + +func (t *alterIndexTask) TraceCtx() context.Context { + return t.ctx +} + +func (t *alterIndexTask) ID() UniqueID { + return t.req.GetBase().GetMsgID() +} + +func (t *alterIndexTask) SetID(uid UniqueID) { + t.req.GetBase().MsgID = uid +} + +func (t *alterIndexTask) Name() string { + return CreateIndexTaskName +} + +func (t *alterIndexTask) Type() commonpb.MsgType { + return t.req.GetBase().GetMsgType() +} + +func (t *alterIndexTask) BeginTs() Timestamp { + return t.req.GetBase().GetTimestamp() +} + +func (t *alterIndexTask) EndTs() Timestamp { + return t.req.GetBase().GetTimestamp() +} + +func (t *alterIndexTask) SetTs(ts Timestamp) { + t.req.Base.Timestamp = ts +} + +func (t *alterIndexTask) OnEnqueue() error { + if t.req.Base == nil { + t.req.Base = commonpbutil.NewMsgBase() + } + return nil +} + +func (t *alterIndexTask) PreExecute(ctx context.Context) error { + t.req.Base.MsgType = commonpb.MsgType_AlterIndex + t.req.Base.SourceID = paramtable.GetNodeID() + + for _, param := range t.req.GetExtraParams() { + if !indexparams.IsConfigableIndexParam(param.GetKey()) { + return merr.WrapErrParameterInvalidMsg("%s is not configable index param", param.GetKey()) + } + } + + collName := t.req.GetCollectionName() + + collection, err := globalMetaCache.GetCollectionID(ctx, t.req.GetDbName(), collName) + if err != nil { + return err + } + t.collectionID = collection + + if err = validateIndexName(t.req.GetIndexName()); err != nil { + return err + } + + loaded, err := isCollectionLoaded(ctx, t.querycoord, collection) + if err != nil { + return err + } + if loaded { + return merr.WrapErrCollectionLoaded(collName, "can't alter index on loaded collection, please release the collection first") + } + + return nil +} + +func (t *alterIndexTask) Execute(ctx context.Context) error { + log := log.Ctx(ctx).With( + zap.String("collection", t.req.GetCollectionName()), + zap.String("indexName", t.req.GetIndexName()), + zap.Any("params", t.req.GetExtraParams()), + ) + + log.Info("alter index") + + var err error + req := &indexpb.AlterIndexRequest{ + CollectionID: t.collectionID, + IndexName: t.req.GetIndexName(), + Params: t.req.GetExtraParams(), + } + t.result, err = t.datacoord.AlterIndex(ctx, req) + if err != nil { + return err + } + if t.result.ErrorCode != commonpb.ErrorCode_Success { + return errors.New(t.result.Reason) + } + SendReplicateMessagePack(ctx, t.replicateMsgStream, t.req) + return nil +} + +func (t *alterIndexTask) PostExecute(ctx context.Context) error { + return nil +} + type describeIndexTask struct { Condition *milvuspb.DescribeIndexRequest diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 8ffef9c16feee..53a458f10092d 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -1568,6 +1568,11 @@ func SendReplicateMessagePack(ctx context.Context, replicateMsgStream msgstream. BaseMsg: getBaseMsg(ctx, ts), ReleasePartitionsRequest: *r, } + case *milvuspb.AlterIndexRequest: + tsMsg = &msgstream.AlterIndexMsg{ + BaseMsg: getBaseMsg(ctx, ts), + AlterIndexRequest: *r, + } default: log.Warn("unknown request", zap.Any("request", request)) return diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index a84137046b0df..acd0d9602ef2f 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -22,15 +22,19 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/indexparams" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -194,14 +198,32 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { if channel == nil { channel = ex.targetMgr.GetDmChannel(task.CollectionID(), segment.GetInsertChannel(), meta.NextTarget) } - loadInfo := utils.PackSegmentLoadInfo(resp.GetInfos()[0], channel.GetSeekPosition(), indexes) // Get collection index info - indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID()) + indexInfos, err := ex.broker.DescribeIndex(ctx, task.CollectionID()) if err != nil { log.Warn("fail to get index meta of collection") return err } + // update the field index params + for _, segmentIndex := range indexes { + index, found := lo.Find(indexInfos, func(indexInfo *indexpb.IndexInfo) bool { + return indexInfo.IndexID == segmentIndex.IndexID + }) + if !found { + log.Warn("no collection index info for the given segment index", zap.String("indexName", segmentIndex.GetIndexName())) + } + + params := funcutil.KeyValuePair2Map(segmentIndex.GetIndexParams()) + for _, kv := range index.GetUserIndexParams() { + if indexparams.IsConfigableIndexParam(kv.GetKey()) { + params[kv.GetKey()] = kv.GetValue() + } + } + segmentIndex.IndexParams = funcutil.Map2KeyValuePair(params) + } + + loadInfo := utils.PackSegmentLoadInfo(resp.GetInfos()[0], channel.GetSeekPosition(), indexes) req := packLoadSegmentRequest( task, @@ -210,7 +232,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { collectionInfo.GetProperties(), loadMeta, loadInfo, - indexInfo, + indexInfos, ) // Get shard leader for the given replica and segment diff --git a/internal/querynodev2/segments/load_index_info.go b/internal/querynodev2/segments/load_index_info.go index 81f4b833ca745..35ace62031a77 100644 --- a/internal/querynodev2/segments/load_index_info.go +++ b/internal/querynodev2/segments/load_index_info.go @@ -27,8 +27,12 @@ import "C" import ( "unsafe" + "github.com/pingcap/log" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/indexparams" @@ -56,10 +60,15 @@ func deleteLoadIndexInfo(info *LoadIndexInfo) { C.DeleteLoadIndexInfo(info.cLoadIndexInfo) } -func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo, collectionID int64, partitionID int64, segmentID int64, fieldType schemapb.DataType, enableMmap bool) error { +func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo, collectionID int64, partitionID int64, segmentID int64, fieldType schemapb.DataType) error { fieldID := indexInfo.FieldID indexPaths := indexInfo.IndexFilePaths + indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams) + enableMmap := indexParams[common.MmapEnabledKey] == "true" + // as Knowhere reports error if encounter a unknown param, we need to delete it + delete(indexParams, common.MmapEnabledKey) + mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue() err := li.appendFieldInfo(collectionID, partitionID, segmentID, fieldID, fieldType, enableMmap, mmapDirPath) if err != nil { @@ -72,7 +81,6 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo, } // some build params also exist in indexParams, which are useless during loading process - indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams) if indexParams["index_type"] == indexparamcheck.IndexDISKANN { err = indexparams.SetDiskIndexLoadParams(paramtable.Get(), indexParams, indexInfo.GetNumRows()) if err != nil { @@ -85,6 +93,7 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo, return err } + log.Info("load with index params", zap.Any("indexParams", indexParams)) for key, value := range indexParams { err = li.appendIndexParam(key, value) if err != nil { diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 1856ec0e0b627..0c6b77ca048e3 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -857,14 +857,14 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error { return nil } -func (s *LocalSegment) LoadIndex(indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType, enableMmap bool) error { +func (s *LocalSegment) LoadIndex(indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType) error { loadIndexInfo, err := newLoadIndexInfo() defer deleteLoadIndexInfo(loadIndexInfo) if err != nil { return err } - err = loadIndexInfo.appendLoadIndexInfo(indexInfo, s.collectionID, s.partitionID, s.segmentID, fieldType, enableMmap) + err = loadIndexInfo.appendLoadIndexInfo(indexInfo, s.collectionID, s.partitionID, s.segmentID, fieldType) if err != nil { if loadIndexInfo.cleanLocalData() != nil { log.Warn("failed to clean cached data on disk after append index failed", diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 345677989d90d..8759419f04ebf 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -727,7 +727,7 @@ func (loader *segmentLoader) loadFieldIndex(ctx context.Context, segment *LocalS return merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load field index") } - return segment.LoadIndex(indexInfo, fieldType, common.IsFieldMmapEnabled(collection.Schema(), indexInfo.GetFieldID())) + return segment.LoadIndex(indexInfo, fieldType) } func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet, diff --git a/pkg/go.mod b/pkg/go.mod index 1a092c7a46dbc..93eb0b64cde04 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.16.5 github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231208092431-02cbad30332f + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c github.com/nats-io/nats-server/v2 v2.9.17 github.com/nats-io/nats.go v1.24.0 github.com/panjf2000/ants/v2 v2.7.2 @@ -44,6 +44,7 @@ require ( golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 golang.org/x/net v0.17.0 golang.org/x/sync v0.1.0 + golang.org/x/sys v0.13.0 google.golang.org/grpc v1.54.0 google.golang.org/protobuf v1.30.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 @@ -156,7 +157,6 @@ require ( go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/multierr v1.7.0 // indirect golang.org/x/oauth2 v0.6.0 // indirect - golang.org/x/sys v0.13.0 // indirect golang.org/x/term v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/time v0.3.0 // indirect diff --git a/pkg/go.sum b/pkg/go.sum index 5c20be4bdfb91..5376060c9b99f 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -477,12 +477,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.2-0.20231008032233-5d64d443769d h1:K8yyzz8BCBm+wirhRgySyB8wN+sw33eB3VsLz6Slu5s= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.2-0.20231008032233-5d64d443769d/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e h1:IH1WAXwEF8vbwahPdupi4zzRNWViT4B7fZzIjtRLpG4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231208092431-02cbad30332f h1:0cAMN9OsgBxlEUY8i1e1ocrBZ/cpu/Kdguz4JWz9fUc= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231208092431-02cbad30332f/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c h1:Wbc2IZt/13+B5jc8JPU/dOxGYy+1jeOsChVgcza+qgw= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= diff --git a/pkg/mq/msgstream/msg_for_index.go b/pkg/mq/msgstream/msg_for_index.go index 063e008daa74a..96c593d8bb731 100644 --- a/pkg/mq/msgstream/msg_for_index.go +++ b/pkg/mq/msgstream/msg_for_index.go @@ -86,6 +86,68 @@ func (it *CreateIndexMsg) Size() int { return proto.Size(&it.CreateIndexRequest) } +// AlterIndexMsg is a message pack that contains create index request +type AlterIndexMsg struct { + BaseMsg + milvuspb.AlterIndexRequest +} + +// interface implementation validation +var _ TsMsg = &AlterIndexMsg{} + +// ID returns the ID of this message pack +func (it *AlterIndexMsg) ID() UniqueID { + return it.Base.MsgID +} + +// SetID set the ID of this message pack +func (it *AlterIndexMsg) SetID(id UniqueID) { + it.Base.MsgID = id +} + +// Type returns the type of this message pack +func (it *AlterIndexMsg) Type() MsgType { + return it.Base.MsgType +} + +// SourceID indicates which component generated this message +func (it *AlterIndexMsg) SourceID() int64 { + return it.Base.SourceID +} + +// Marshal is used to serialize a message pack to byte array +func (it *AlterIndexMsg) Marshal(input TsMsg) (MarshalType, error) { + AlterIndexMsg := input.(*AlterIndexMsg) + AlterIndexRequest := &AlterIndexMsg.AlterIndexRequest + mb, err := proto.Marshal(AlterIndexRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +// Unmarshal is used to deserialize a message pack from byte array +func (it *AlterIndexMsg) Unmarshal(input MarshalType) (TsMsg, error) { + alterIndexRequest := milvuspb.AlterIndexRequest{} + in, err := convertToByteArray(input) + if err != nil { + return nil, err + } + err = proto.Unmarshal(in, &alterIndexRequest) + if err != nil { + return nil, err + } + alterIndexMsg := &AlterIndexMsg{AlterIndexRequest: alterIndexRequest} + alterIndexMsg.BeginTimestamp = alterIndexMsg.GetBase().GetTimestamp() + alterIndexMsg.EndTimestamp = alterIndexMsg.GetBase().GetTimestamp() + + return alterIndexMsg, nil +} + +func (it *AlterIndexMsg) Size() int { + return proto.Size(&it.AlterIndexRequest) +} + // DropIndexMsg is a message pack that contains drop index request type DropIndexMsg struct { BaseMsg diff --git a/pkg/util/funcutil/policy_test.go b/pkg/util/funcutil/policy_test.go index 93395768accf6..2eadb491b93bd 100644 --- a/pkg/util/funcutil/policy_test.go +++ b/pkg/util/funcutil/policy_test.go @@ -20,7 +20,7 @@ func Test_GetPrivilegeExtObj(t *testing.T) { assert.Equal(t, commonpb.ObjectPrivilege_PrivilegeLoad, privilegeExt.ObjectPrivilege) assert.Equal(t, int32(3), privilegeExt.ObjectNameIndex) - request2 := &milvuspb.GetPartitionStatisticsRequest{} + request2 := &milvuspb.ListAliasesRequest{} _, err = GetPrivilegeExtObj(request2) assert.Error(t, err) } diff --git a/pkg/util/indexparams/index_params.go b/pkg/util/indexparams/index_params.go index b26b2593b79c3..978a8b525c765 100644 --- a/pkg/util/indexparams/index_params.go +++ b/pkg/util/indexparams/index_params.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) const ( @@ -48,6 +49,16 @@ const ( MaxBeamWidth = 16 ) +var configableIndexParams = typeutil.NewSet[string]() + +func init() { + configableIndexParams.Insert(common.MmapEnabledKey) +} + +func IsConfigableIndexParam(key string) bool { + return configableIndexParams.Contain(key) +} + func getRowDataSizeOfFloatVector(numRows int64, dim int64) int64 { var floatValue float32 /* #nosec G103 */ diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index d83358ea95806..3f3a3095f94f8 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -49,6 +49,7 @@ var ( ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false) ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false) ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true) + ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false) // Partition related ErrPartitionNotFound = newMilvusError("partition not found", 200, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index 546f1e9b45d24..adead2617f40c 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -85,6 +85,7 @@ func (s *ErrSuite) TestWrap() { s.ErrorIs(WrapErrCollectionNotFound("test_collection", "failed to get collection"), ErrCollectionNotFound) s.ErrorIs(WrapErrCollectionNotLoaded("test_collection", "failed to query"), ErrCollectionNotLoaded) s.ErrorIs(WrapErrCollectionNotFullyLoaded("test_collection", "failed to query"), ErrCollectionNotFullyLoaded) + s.ErrorIs(WrapErrCollectionNotLoaded("test_collection", "failed to alter index %s", "hnsw"), ErrCollectionNotLoaded) // Partition related s.ErrorIs(WrapErrPartitionNotFound("test_partition", "failed to get partition"), ErrPartitionNotFound) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index b093cf1291c5a..abe3178d4429f 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -456,6 +456,15 @@ func WrapErrCollectionNotFullyLoaded(collection any, msg ...string) error { return err } +func WrapErrCollectionLoaded(collection string, msgAndArgs ...any) error { + err := wrapFields(ErrCollectionLoaded, value("collection", collection)) + if len(msgAndArgs) > 0 { + msg := msgAndArgs[0].(string) + err = errors.Wrapf(err, msg, msgAndArgs[1:]...) + } + return err +} + func WrapErrAliasNotFound(db any, alias any, msg ...string) error { err := wrapFields(ErrAliasNotFound, value("database", db),