Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: inconsistent timezone handling in daily allocation aggregation #9888

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion master/internal/db/postgres_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ SELECT jsonb_build_object(
func (db *PgDB) UpdateResourceAllocationAggregation() error {
var lastDatePtr *time.Time
err := db.sql.QueryRow(
`SELECT date_trunc('day', max(date)) FROM resource_aggregates`,
`SELECT date_trunc('day', max(date)::timestamp) FROM resource_aggregates`,
).Scan(&lastDatePtr)
if err != nil {
return errors.Wrap(err, "failed to find last aggregate")
Expand Down
211 changes: 185 additions & 26 deletions master/internal/db/postgres_cluster_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ package db

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect/pgdialect"

"github.com/determined-ai/determined/master/pkg/etc"
"github.com/determined-ai/determined/master/pkg/model"
Expand All @@ -25,32 +30,8 @@ func TestClusterAPI(t *testing.T) {
_, err := db.GetOrCreateClusterID("")
require.NoError(t, err, "failed to get or create cluster id")

// Add a mock user
user := RequireMockUser(t, db)

// Add a job
jID := model.NewJobID()
jIn := &model.Job{
JobID: jID,
JobType: model.JobTypeExperiment,
OwnerID: &user.ID,
}

err = AddJob(jIn)
require.NoError(t, err, "failed to add job")

// Add a task
tID := model.NewTaskID()
tIn := &model.Task{
TaskID: tID,
JobID: &jID,
TaskType: model.TaskTypeTrial,
StartTime: time.Now().UTC().Truncate(time.Millisecond),
}

err = AddTask(context.TODO(), tIn)
require.NoError(t, err, "failed to add task")

_, tIn := CreateMockJobAndTask(t, db)
tID := tIn.TaskID
// Add an allocation
aID := model.AllocationID(string(tID) + "-1")
aIn := &model.Allocation{
Expand Down Expand Up @@ -87,3 +68,181 @@ func TestClusterAPI(t *testing.T) {
"Expected end time of open allocation is = %q but it is = %q instead",
clusterHeartbeat.String(), aOut.EndTime.String())
}

func CreateMockJobAndTask(t *testing.T, db *PgDB) (*model.Job, *model.Task) {
// Add a mock user
user := RequireMockUser(t, db)

// Add a job
jID := model.NewJobID()
jIn := &model.Job{
JobID: jID,
JobType: model.JobTypeExperiment,
OwnerID: &user.ID,
}

err := AddJob(jIn)
require.NoError(t, err, "failed to add job")

// Add a task
tID := model.NewTaskID()
tIn := &model.Task{
TaskID: tID,
JobID: &jID,
TaskType: model.TaskTypeTrial,
StartTime: time.Now().UTC().Truncate(time.Millisecond),
}

err = AddTask(context.TODO(), tIn)
require.NoError(t, err, "failed to add task")

return jIn, tIn
}

type allocAggTest struct {
name string
tzQuery string
timeOfDay string
numSlots int
seconds int
}

func TestUpdateResourceAllocationAggregation(t *testing.T) {
require.NoError(t, etc.SetRootPath(RootFromDB))

db, closeDB := MustResolveTestPostgres(t)
defer closeDB()
MustMigrateTestPostgres(t, db, MigrationsFromDB)

ctx := context.Background()
bunDB := bun.NewDB(db.sql.DB, pgdialect.New())

today := time.Now()

tests := []allocAggTest{
{
name: "UTC basic add",
tzQuery: `SET TIME ZONE 'Etc/UTC'`,
timeOfDay: "04:20:00AM",
numSlots: 5,
seconds: 5,
},
{
name: "Positive UTC offset",
tzQuery: `SET TIME ZONE 'Europe/Athens'`,
timeOfDay: "11:30:00PM",
numSlots: 5,
seconds: 10,
},
{
name: "Negative UTC offset",
tzQuery: `SET TIME ZONE 'America/Montreal'`,
timeOfDay: "01:20:00AM",
numSlots: 7,
seconds: 2,
},
}

type resourceAggregate struct {
bun.BaseModel `bun:"table:resource_aggregates"`
Date time.Time `bun:"date"`
Seconds float64 `bun:"seconds"`
}

for ind, test := range tests {
t.Run(test.name, func(t *testing.T) {
offset := -1 * (480 - (24 * ind))
recentDate := today.Add(time.Hour * time.Duration(offset))
formattedDate, prevSeconds := setupUpdateResourceAllocationAggregation(ctx, t, db,
bunDB, recentDate, test)
err := db.UpdateResourceAllocationAggregation()
require.NoError(t, err)

ra := resourceAggregate{}
err = bunDB.NewSelect().
Model(&ra).
Where("date = ? AND aggregation_type = ?", formattedDate, "total").
Scan(ctx)
require.NoError(t, err)

var expectedSeconds interface{} = float64(test.numSlots*test.seconds) + prevSeconds
var seconds interface{} = ra.Seconds
require.InEpsilon(t, expectedSeconds, seconds, 0.5)
})
}
}

func setupUpdateResourceAllocationAggregation(ctx context.Context, t *testing.T, db *PgDB,
bunDB *bun.DB, recentDate time.Time, test allocAggTest,
) (string, float64) {
// (Setup) Set the timezone.
_, err := bunDB.NewRaw(test.tzQuery).Exec(ctx)
require.NoError(t, err)

_, tIn := CreateMockJobAndTask(t, db)
tID := tIn.TaskID

formattedDate := recentDate.Format(time.DateOnly)
yearMonthDay := strings.Split(formattedDate, "-")
startDate := fmt.Sprintf("%s/%s %s %s +00", yearMonthDay[1], yearMonthDay[2], test.timeOfDay,
yearMonthDay[0])

// (Setup) the allocation's start and end time.
startTime, err := time.Parse("01/02 03:04:05PM 2006 -07", startDate)
require.NoError(t, err)

var prevSeconds float64
err = bunDB.NewRaw(`
WITH d AS (
SELECT
tsrange(
?::timestamp,
(?::timestamp + interval '1 day')
) AS period
),
allocs_in_range AS (
SELECT
extract(
EPOCH
FROM
upper(d.period * alloc.range) - lower(d.period * alloc.range)
) * alloc.slots::float AS seconds
FROM
(
SELECT
slots,
tsrange(start_time, greatest(start_time, end_time)) AS range
FROM
allocations
WHERE
start_time IS NOT NULL
) AS alloc,
d
WHERE
d.period && alloc.range
)
SELECT coalesce(sum(allocs_in_range.seconds), 0) FROM allocs_in_range
`, formattedDate, formattedDate).Scan(ctx, &prevSeconds)
require.NoError(t, err)

endTime := startTime.Add(time.Second * time.Duration(test.seconds))
allocID := uuid.NewString()
alloc := model.Allocation{
AllocationID: *model.NewAllocationID(&allocID),
TaskID: tID,
Slots: test.numSlots,
ResourcePool: uuid.NewString(),
StartTime: &startTime,
EndTime: &endTime,
}
_, err = bunDB.NewDelete().
Table("resource_aggregates").
Where("date >= ?", formattedDate).
Exec(ctx)
require.NoError(t, err)

_, err = bunDB.NewInsert().Model(&alloc).Exec(ctx)
require.NoError(t, err)

return formattedDate, prevSeconds
}
8 changes: 4 additions & 4 deletions master/static/srv/update_aggregated_allocation.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
WITH const AS (
SELECT
tstzrange(
$1::timestamptz,
($1::timestamptz + interval '1 day')
tsrange(
$1::timestamp,
($1::timestamp + interval '1 day')
) AS period
),

Expand All @@ -21,7 +21,7 @@ allocs_in_range AS (
(
SELECT
*,
tstzrange(start_time, greatest(start_time, end_time)) AS range
tsrange(start_time, greatest(start_time, end_time)) AS range
FROM
allocations
WHERE
Expand Down
Loading