From fa44ff20ff0f83d19fb79fa028221b55f732ea2b Mon Sep 17 00:00:00 2001 From: nicolasgere Date: Wed, 24 Apr 2024 15:29:22 -0700 Subject: [PATCH] [ENH]: add purging log query and property test (#2020) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - Make collection ordering asc - Add log purge query - Improve property/model testing --- go/database/log/db/queries.sql.go | 9 +++++ go/database/log/queries/queries.sql | 3 ++ go/pkg/log/repository/log.go | 5 +++ go/pkg/log/server/property_test.go | 35 +++++++++++++++++-- go/pkg/metastore/db/dao/collection.go | 10 ++++-- go/pkg/metastore/db/dao/test_utils.go | 3 +- .../metastore/db/dbmodel/mocks/IMetaDomain.go | 2 -- 7 files changed, 58 insertions(+), 9 deletions(-) diff --git a/go/database/log/db/queries.sql.go b/go/database/log/db/queries.sql.go index 8dc1d8dc6f9..b4a306bd582 100644 --- a/go/database/log/db/queries.sql.go +++ b/go/database/log/db/queries.sql.go @@ -132,6 +132,15 @@ type InsertRecordParams struct { Timestamp int64 } +const purgeRecords = `-- name: PurgeRecords :exec +DELETE FROM record_log r using collection c where r.collection_id = c.id and r.offset < c.record_compaction_offset_position +` + +func (q *Queries) PurgeRecords(ctx context.Context) error { + _, err := q.db.Exec(ctx, purgeRecords) + return err +} + const updateCollectionCompactionOffsetPosition = `-- name: UpdateCollectionCompactionOffsetPosition :exec UPDATE collection set record_compaction_offset_position = $2 where id = $1 ` diff --git a/go/database/log/queries/queries.sql b/go/database/log/queries/queries.sql index d0a4967ab60..98e27a7a4be 100644 --- a/go/database/log/queries/queries.sql +++ b/go/database/log/queries/queries.sql @@ -29,3 +29,6 @@ UPDATE collection set record_enumeration_offset_position = $2 where id = $1; -- name: InsertCollection :one INSERT INTO collection (id, record_enumeration_offset_position, record_compaction_offset_position) values($1, $2, $3) returning *; + +-- name: PurgeRecords :exec +DELETE FROM record_log r using collection c where r.collection_id = c.id and r.offset < c.record_compaction_offset_position; diff --git a/go/pkg/log/repository/log.go b/go/pkg/log/repository/log.go index 7a6c5c57aca..ffba86fc747 100644 --- a/go/pkg/log/repository/log.go +++ b/go/pkg/log/repository/log.go @@ -90,6 +90,11 @@ func (r *LogRepository) UpdateCollectionCompactionOffsetPosition(ctx context.Con return } +func (r *LogRepository) PurgeRecords(ctx context.Context) (err error) { + err = r.queries.PurgeRecords(ctx) + return +} + func NewLogRepository(conn *pgx.Conn) *LogRepository { return &LogRepository{ conn: conn, diff --git a/go/pkg/log/server/property_test.go b/go/pkg/log/server/property_test.go index 5583dbb066b..5e90e92683a 100644 --- a/go/pkg/log/server/property_test.go +++ b/go/pkg/log/server/property_test.go @@ -2,6 +2,7 @@ package server import ( "context" + log "github.com/chroma-core/chroma/go/database/log/db" "github.com/chroma-core/chroma/go/pkg/log/configuration" "github.com/chroma-core/chroma/go/pkg/log/repository" "github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb" @@ -27,6 +28,7 @@ type LogServerTestSuite struct { logServer logservicepb.LogServiceServer model ModelState t *testing.T + lr *repository.LogRepository } func (suite *LogServerTestSuite) SetupSuite() { @@ -40,8 +42,8 @@ func (suite *LogServerTestSuite) SetupSuite() { assert.NoError(suite.t, err, "Failed to create new pg connection") err = libs2.RunMigration(ctx, connectionString) assert.NoError(suite.t, err, "Failed to run migration") - lr := repository.NewLogRepository(conn) - suite.logServer = NewLogServer(lr) + suite.lr = repository.NewLogRepository(conn) + suite.logServer = NewLogServer(suite.lr) suite.model = ModelState{ CollectionData: map[types.UniqueID][]*coordinatorpb.OperationRecord{}, CollectionCompactionOffset: map[types.UniqueID]int64{}, @@ -117,11 +119,23 @@ func (suite *LogServerTestSuite) TestRecordLogDb_PushLogs() { CollectionId: c.String(), StartFromOffset: startOffset, BatchSize: batchSize, - EndTimestamp: time.Now().Unix(), + EndTimestamp: time.Now().UnixNano(), }) if err != nil { t.Fatal(err) } + // Verify that the number of records returned is correct + if len(suite.model.CollectionData[c]) > int(startOffset) { + if len(suite.model.CollectionData[c])-int(startOffset) < int(batchSize) { + suite.Equal(len(response.Records), len(suite.model.CollectionData[c])-int(startOffset)+1) + } else { + suite.Equal(len(response.Records), int(batchSize)) + } + } + // Verify that the first record offset is correct + if len(response.Records) > 0 { + suite.Equal(response.Records[0].LogOffset, startOffset) + } // Verify that record returned is matching the expected record for _, record := range response.Records { expectedRecord := suite.model.CollectionData[c][record.LogOffset-1] @@ -150,6 +164,21 @@ func (suite *LogServerTestSuite) TestRecordLogDb_PushLogs() { } } }, + "purgeLogs": func(t *rapid.T) { + err := suite.lr.PurgeRecords(ctx) + suite.NoError(err) + // Verify that all record logs are purged + for id, offset := range suite.model.CollectionCompactionOffset { + if offset != 0 { + var records []log.RecordLog + records, err = suite.lr.PullRecords(ctx, id.String(), 0, 1, time.Now().UnixNano()) + suite.NoError(err) + if len(records) > 0 { + suite.Equal(offset, records[0].Offset) + } + } + } + }, }) }) } diff --git a/go/pkg/metastore/db/dao/collection.go b/go/pkg/metastore/db/dao/collection.go index 82a12110f2e..fb2bb7dced5 100644 --- a/go/pkg/metastore/db/dao/collection.go +++ b/go/pkg/metastore/db/dao/collection.go @@ -33,10 +33,10 @@ func (s *collectionDb) GetCollections(id *string, name *string, tenantID string, var collections []*dbmodel.CollectionAndMetadata query := s.db.Table("collections"). - Select("collections.id, collections.log_position, collections.version, collections.name, collections.dimension, collections.database_id, databases.name, databases.tenant_id, collection_metadata.key, collection_metadata.str_value, collection_metadata.int_value, collection_metadata.float_value"). + Select("collections.id, collections.log_position, collections.version, collections.name, collections.dimension, collections.database_id, collections.created_at, databases.name, databases.tenant_id, collection_metadata.key, collection_metadata.str_value, collection_metadata.int_value, collection_metadata.float_value"). Joins("LEFT JOIN collection_metadata ON collections.id = collection_metadata.collection_id"). Joins("INNER JOIN databases ON collections.database_id = databases.id"). - Order("collections.id") + Order("collections.id asc") if limit != nil { query = query.Limit(int(*limit)) getCollectionInput.WriteString("limit: " + string(*limit) + ", ") @@ -85,6 +85,7 @@ func (s *collectionDb) GetCollections(id *string, name *string, tenantID string, collectionName string collectionDimension sql.NullInt32 collectionDatabaseID string + collectionCreatedAt sql.NullTime databaseName string databaseTenantID string key sql.NullString @@ -93,7 +94,7 @@ func (s *collectionDb) GetCollections(id *string, name *string, tenantID string, floatValue sql.NullFloat64 ) - err := rows.Scan(&collectionID, &logPosition, &version, &collectionName, &collectionDimension, &collectionDatabaseID, &databaseName, &databaseTenantID, &key, &strValue, &intValue, &floatValue) + err := rows.Scan(&collectionID, &logPosition, &version, &collectionName, &collectionDimension, &collectionDatabaseID, &collectionCreatedAt, &databaseName, &databaseTenantID, &key, &strValue, &intValue, &floatValue) if err != nil { log.Error("scan collection failed", zap.Error(err)) return nil, err @@ -119,6 +120,9 @@ func (s *collectionDb) GetCollections(id *string, name *string, tenantID string, } else { currentCollection.Collection.Dimension = nil } + if collectionCreatedAt.Valid { + currentCollection.Collection.CreatedAt = collectionCreatedAt.Time + } if currentCollectionID != "" { collections = append(collections, currentCollection) diff --git a/go/pkg/metastore/db/dao/test_utils.go b/go/pkg/metastore/db/dao/test_utils.go index 5cb5734b183..34d9f24d360 100644 --- a/go/pkg/metastore/db/dao/test_utils.go +++ b/go/pkg/metastore/db/dao/test_utils.go @@ -138,7 +138,8 @@ func CreateTestCollection(db *gorm.DB, collectionName string, dimension int32, d return "", err } } - + // Avoid to have the same create time for a collection, postgres have a millisecond precision, in unit test we can have multiple collections created in the same millisecond + time.Sleep(10 * time.Millisecond) return collectionId, nil } diff --git a/go/pkg/metastore/db/dbmodel/mocks/IMetaDomain.go b/go/pkg/metastore/db/dbmodel/mocks/IMetaDomain.go index 6c05bb8c624..afe6edeea86 100644 --- a/go/pkg/metastore/db/dbmodel/mocks/IMetaDomain.go +++ b/go/pkg/metastore/db/dbmodel/mocks/IMetaDomain.go @@ -126,8 +126,6 @@ func (_m *IMetaDomain) TenantDb(ctx context.Context) dbmodel.ITenantDb { return r0 } - - // NewIMetaDomain creates a new instance of IMetaDomain. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewIMetaDomain(t interface {