Skip to content

Commit

Permalink
[ENH]: add purging log query and property test (#2020)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - Make collection ordering asc
	 - Add log purge query
	 - Improve property/model testing
  • Loading branch information
nicolasgere authored Apr 24, 2024
1 parent 413174d commit fa44ff2
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 9 deletions.
9 changes: 9 additions & 0 deletions go/database/log/db/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions go/database/log/queries/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ UPDATE collection set record_enumeration_offset_position = $2 where id = $1;

-- name: InsertCollection :one
INSERT INTO collection (id, record_enumeration_offset_position, record_compaction_offset_position) values($1, $2, $3) returning *;

-- name: PurgeRecords :exec
DELETE FROM record_log r using collection c where r.collection_id = c.id and r.offset < c.record_compaction_offset_position;
5 changes: 5 additions & 0 deletions go/pkg/log/repository/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func (r *LogRepository) UpdateCollectionCompactionOffsetPosition(ctx context.Con
return
}

func (r *LogRepository) PurgeRecords(ctx context.Context) (err error) {
err = r.queries.PurgeRecords(ctx)
return
}

func NewLogRepository(conn *pgx.Conn) *LogRepository {
return &LogRepository{
conn: conn,
Expand Down
35 changes: 32 additions & 3 deletions go/pkg/log/server/property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
log "github.com/chroma-core/chroma/go/database/log/db"
"github.com/chroma-core/chroma/go/pkg/log/configuration"
"github.com/chroma-core/chroma/go/pkg/log/repository"
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
Expand All @@ -27,6 +28,7 @@ type LogServerTestSuite struct {
logServer logservicepb.LogServiceServer
model ModelState
t *testing.T
lr *repository.LogRepository
}

func (suite *LogServerTestSuite) SetupSuite() {
Expand All @@ -40,8 +42,8 @@ func (suite *LogServerTestSuite) SetupSuite() {
assert.NoError(suite.t, err, "Failed to create new pg connection")
err = libs2.RunMigration(ctx, connectionString)
assert.NoError(suite.t, err, "Failed to run migration")
lr := repository.NewLogRepository(conn)
suite.logServer = NewLogServer(lr)
suite.lr = repository.NewLogRepository(conn)
suite.logServer = NewLogServer(suite.lr)
suite.model = ModelState{
CollectionData: map[types.UniqueID][]*coordinatorpb.OperationRecord{},
CollectionCompactionOffset: map[types.UniqueID]int64{},
Expand Down Expand Up @@ -117,11 +119,23 @@ func (suite *LogServerTestSuite) TestRecordLogDb_PushLogs() {
CollectionId: c.String(),
StartFromOffset: startOffset,
BatchSize: batchSize,
EndTimestamp: time.Now().Unix(),
EndTimestamp: time.Now().UnixNano(),
})
if err != nil {
t.Fatal(err)
}
// Verify that the number of records returned is correct
if len(suite.model.CollectionData[c]) > int(startOffset) {
if len(suite.model.CollectionData[c])-int(startOffset) < int(batchSize) {
suite.Equal(len(response.Records), len(suite.model.CollectionData[c])-int(startOffset)+1)
} else {
suite.Equal(len(response.Records), int(batchSize))
}
}
// Verify that the first record offset is correct
if len(response.Records) > 0 {
suite.Equal(response.Records[0].LogOffset, startOffset)
}
// Verify that record returned is matching the expected record
for _, record := range response.Records {
expectedRecord := suite.model.CollectionData[c][record.LogOffset-1]
Expand Down Expand Up @@ -150,6 +164,21 @@ func (suite *LogServerTestSuite) TestRecordLogDb_PushLogs() {
}
}
},
"purgeLogs": func(t *rapid.T) {
err := suite.lr.PurgeRecords(ctx)
suite.NoError(err)
// Verify that all record logs are purged
for id, offset := range suite.model.CollectionCompactionOffset {
if offset != 0 {
var records []log.RecordLog
records, err = suite.lr.PullRecords(ctx, id.String(), 0, 1, time.Now().UnixNano())
suite.NoError(err)
if len(records) > 0 {
suite.Equal(offset, records[0].Offset)
}
}
}
},
})
})
}
Expand Down
10 changes: 7 additions & 3 deletions go/pkg/metastore/db/dao/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func (s *collectionDb) GetCollections(id *string, name *string, tenantID string,
var collections []*dbmodel.CollectionAndMetadata

query := s.db.Table("collections").
Select("collections.id, collections.log_position, collections.version, collections.name, collections.dimension, collections.database_id, databases.name, databases.tenant_id, collection_metadata.key, collection_metadata.str_value, collection_metadata.int_value, collection_metadata.float_value").
Select("collections.id, collections.log_position, collections.version, collections.name, collections.dimension, collections.database_id, collections.created_at, databases.name, databases.tenant_id, collection_metadata.key, collection_metadata.str_value, collection_metadata.int_value, collection_metadata.float_value").
Joins("LEFT JOIN collection_metadata ON collections.id = collection_metadata.collection_id").
Joins("INNER JOIN databases ON collections.database_id = databases.id").
Order("collections.id")
Order("collections.id asc")
if limit != nil {
query = query.Limit(int(*limit))
getCollectionInput.WriteString("limit: " + string(*limit) + ", ")
Expand Down Expand Up @@ -85,6 +85,7 @@ func (s *collectionDb) GetCollections(id *string, name *string, tenantID string,
collectionName string
collectionDimension sql.NullInt32
collectionDatabaseID string
collectionCreatedAt sql.NullTime
databaseName string
databaseTenantID string
key sql.NullString
Expand All @@ -93,7 +94,7 @@ func (s *collectionDb) GetCollections(id *string, name *string, tenantID string,
floatValue sql.NullFloat64
)

err := rows.Scan(&collectionID, &logPosition, &version, &collectionName, &collectionDimension, &collectionDatabaseID, &databaseName, &databaseTenantID, &key, &strValue, &intValue, &floatValue)
err := rows.Scan(&collectionID, &logPosition, &version, &collectionName, &collectionDimension, &collectionDatabaseID, &collectionCreatedAt, &databaseName, &databaseTenantID, &key, &strValue, &intValue, &floatValue)
if err != nil {
log.Error("scan collection failed", zap.Error(err))
return nil, err
Expand All @@ -119,6 +120,9 @@ func (s *collectionDb) GetCollections(id *string, name *string, tenantID string,
} else {
currentCollection.Collection.Dimension = nil
}
if collectionCreatedAt.Valid {
currentCollection.Collection.CreatedAt = collectionCreatedAt.Time
}

if currentCollectionID != "" {
collections = append(collections, currentCollection)
Expand Down
3 changes: 2 additions & 1 deletion go/pkg/metastore/db/dao/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ func CreateTestCollection(db *gorm.DB, collectionName string, dimension int32, d
return "", err
}
}

// Avoid to have the same create time for a collection, postgres have a millisecond precision, in unit test we can have multiple collections created in the same millisecond
time.Sleep(10 * time.Millisecond)
return collectionId, nil
}

Expand Down
2 changes: 0 additions & 2 deletions go/pkg/metastore/db/dbmodel/mocks/IMetaDomain.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit fa44ff2

Please sign in to comment.