diff --git a/master/internal/db/postgres_cluster.go b/master/internal/db/postgres_cluster.go index 2c6b5512dcf..568a9799467 100644 --- a/master/internal/db/postgres_cluster.go +++ b/master/internal/db/postgres_cluster.go @@ -91,7 +91,7 @@ SELECT jsonb_build_object( func (db *PgDB) UpdateResourceAllocationAggregation() error { var lastDatePtr *time.Time err := db.sql.QueryRow( - `SELECT date_trunc('day', max(date)) FROM resource_aggregates`, + `SELECT date_trunc('day', max(date)::timestamp) FROM resource_aggregates`, ).Scan(&lastDatePtr) if err != nil { return errors.Wrap(err, "failed to find last aggregate") diff --git a/master/internal/db/postgres_cluster_intg_test.go b/master/internal/db/postgres_cluster_intg_test.go index a4fc8aedb90..93e143abefe 100644 --- a/master/internal/db/postgres_cluster_intg_test.go +++ b/master/internal/db/postgres_cluster_intg_test.go @@ -5,10 +5,15 @@ package db import ( "context" + "fmt" + "strings" "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/require" + "github.com/uptrace/bun" + "github.com/uptrace/bun/dialect/pgdialect" "github.com/determined-ai/determined/master/pkg/etc" "github.com/determined-ai/determined/master/pkg/model" @@ -25,32 +30,8 @@ func TestClusterAPI(t *testing.T) { _, err := db.GetOrCreateClusterID("") require.NoError(t, err, "failed to get or create cluster id") - // Add a mock user - user := RequireMockUser(t, db) - - // Add a job - jID := model.NewJobID() - jIn := &model.Job{ - JobID: jID, - JobType: model.JobTypeExperiment, - OwnerID: &user.ID, - } - - err = AddJob(jIn) - require.NoError(t, err, "failed to add job") - - // Add a task - tID := model.NewTaskID() - tIn := &model.Task{ - TaskID: tID, - JobID: &jID, - TaskType: model.TaskTypeTrial, - StartTime: time.Now().UTC().Truncate(time.Millisecond), - } - - err = AddTask(context.TODO(), tIn) - require.NoError(t, err, "failed to add task") - + _, tIn := CreateMockJobAndTask(t, db) + tID := tIn.TaskID // Add an allocation aID := model.AllocationID(string(tID) + "-1") aIn := &model.Allocation{ @@ -87,3 +68,181 @@ func TestClusterAPI(t *testing.T) { "Expected end time of open allocation is = %q but it is = %q instead", clusterHeartbeat.String(), aOut.EndTime.String()) } + +func CreateMockJobAndTask(t *testing.T, db *PgDB) (*model.Job, *model.Task) { + // Add a mock user + user := RequireMockUser(t, db) + + // Add a job + jID := model.NewJobID() + jIn := &model.Job{ + JobID: jID, + JobType: model.JobTypeExperiment, + OwnerID: &user.ID, + } + + err := AddJob(jIn) + require.NoError(t, err, "failed to add job") + + // Add a task + tID := model.NewTaskID() + tIn := &model.Task{ + TaskID: tID, + JobID: &jID, + TaskType: model.TaskTypeTrial, + StartTime: time.Now().UTC().Truncate(time.Millisecond), + } + + err = AddTask(context.TODO(), tIn) + require.NoError(t, err, "failed to add task") + + return jIn, tIn +} + +type allocAggTest struct { + name string + tzQuery string + timeOfDay string + numSlots int + seconds int +} + +func TestUpdateResourceAllocationAggregation(t *testing.T) { + require.NoError(t, etc.SetRootPath(RootFromDB)) + + db, closeDB := MustResolveTestPostgres(t) + defer closeDB() + MustMigrateTestPostgres(t, db, MigrationsFromDB) + + ctx := context.Background() + bunDB := bun.NewDB(db.sql.DB, pgdialect.New()) + + today := time.Now() + + tests := []allocAggTest{ + { + name: "UTC basic add", + tzQuery: `SET TIME ZONE 'Etc/UTC'`, + timeOfDay: "04:20:00AM", + numSlots: 5, + seconds: 5, + }, + { + name: "Positive UTC offset", + tzQuery: `SET TIME ZONE 'Europe/Athens'`, + timeOfDay: "11:30:00PM", + numSlots: 5, + seconds: 10, + }, + { + name: "Negative UTC offset", + tzQuery: `SET TIME ZONE 'America/Montreal'`, + timeOfDay: "01:20:00AM", + numSlots: 7, + seconds: 2, + }, + } + + type resourceAggregate struct { + bun.BaseModel `bun:"table:resource_aggregates"` + Date time.Time `bun:"date"` + Seconds float64 `bun:"seconds"` + } + + for ind, test := range tests { + t.Run(test.name, func(t *testing.T) { + offset := -1 * (480 - (24 * ind)) + recentDate := today.Add(time.Hour * time.Duration(offset)) + formattedDate, prevSeconds := setupUpdateResourceAllocationAggregation(ctx, t, db, + bunDB, recentDate, test) + err := db.UpdateResourceAllocationAggregation() + require.NoError(t, err) + + ra := resourceAggregate{} + err = bunDB.NewSelect(). + Model(&ra). + Where("date = ? AND aggregation_type = ?", formattedDate, "total"). + Scan(ctx) + require.NoError(t, err) + + var expectedSeconds interface{} = float64(test.numSlots*test.seconds) + prevSeconds + var seconds interface{} = ra.Seconds + require.InEpsilon(t, expectedSeconds, seconds, 0.5) + }) + } +} + +func setupUpdateResourceAllocationAggregation(ctx context.Context, t *testing.T, db *PgDB, + bunDB *bun.DB, recentDate time.Time, test allocAggTest, +) (string, float64) { + // (Setup) Set the timezone. + _, err := bunDB.NewRaw(test.tzQuery).Exec(ctx) + require.NoError(t, err) + + _, tIn := CreateMockJobAndTask(t, db) + tID := tIn.TaskID + + formattedDate := recentDate.Format(time.DateOnly) + yearMonthDay := strings.Split(formattedDate, "-") + startDate := fmt.Sprintf("%s/%s %s %s +00", yearMonthDay[1], yearMonthDay[2], test.timeOfDay, + yearMonthDay[0]) + + // (Setup) the allocation's start and end time. + startTime, err := time.Parse("01/02 03:04:05PM 2006 -07", startDate) + require.NoError(t, err) + + var prevSeconds float64 + err = bunDB.NewRaw(` + WITH d AS ( + SELECT + tsrange( + ?::timestamp, + (?::timestamp + interval '1 day') + ) AS period + ), + allocs_in_range AS ( + SELECT + extract( + EPOCH + FROM + upper(d.period * alloc.range) - lower(d.period * alloc.range) + ) * alloc.slots::float AS seconds + FROM + ( + SELECT + slots, + tsrange(start_time, greatest(start_time, end_time)) AS range + FROM + allocations + WHERE + start_time IS NOT NULL + ) AS alloc, + d + WHERE + d.period && alloc.range + ) + SELECT coalesce(sum(allocs_in_range.seconds), 0) FROM allocs_in_range + `, formattedDate, formattedDate).Scan(ctx, &prevSeconds) + require.NoError(t, err) + + endTime := startTime.Add(time.Second * time.Duration(test.seconds)) + allocID := uuid.NewString() + alloc := model.Allocation{ + AllocationID: *model.NewAllocationID(&allocID), + TaskID: tID, + Slots: test.numSlots, + ResourcePool: uuid.NewString(), + StartTime: &startTime, + EndTime: &endTime, + } + _, err = bunDB.NewDelete(). + Table("resource_aggregates"). + Where("date >= ?", formattedDate). + Exec(ctx) + require.NoError(t, err) + + _, err = bunDB.NewInsert().Model(&alloc).Exec(ctx) + require.NoError(t, err) + + return formattedDate, prevSeconds +} diff --git a/master/static/srv/update_aggregated_allocation.sql b/master/static/srv/update_aggregated_allocation.sql index b08ec615fdb..41b711a7757 100644 --- a/master/static/srv/update_aggregated_allocation.sql +++ b/master/static/srv/update_aggregated_allocation.sql @@ -1,8 +1,8 @@ WITH const AS ( SELECT - tstzrange( - $1::timestamptz, - ($1::timestamptz + interval '1 day') + tsrange( + $1::timestamp, + ($1::timestamp + interval '1 day') ) AS period ), @@ -21,7 +21,7 @@ allocs_in_range AS ( ( SELECT *, - tstzrange(start_time, greatest(start_time, end_time)) AS range + tsrange(start_time, greatest(start_time, end_time)) AS range FROM allocations WHERE