Skip to content

Commit

Permalink
chore: add tracing info to some backend APIs (#9841)
Browse files Browse the repository at this point in the history
(cherry picked from commit 2838af4)
  • Loading branch information
kkunapuli authored and determined-ci committed Aug 20, 2024
1 parent 46a400e commit 8acaee5
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 10 deletions.
10 changes: 9 additions & 1 deletion master/internal/api_agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"runtime/trace"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -20,7 +21,14 @@ import (
func (a *apiServer) GetAgents(
ctx context.Context, req *apiv1.GetAgentsRequest,
) (*apiv1.GetAgentsResponse, error) {
resp, err := a.m.rm.GetAgents()
ctx, task := trace.NewTask(ctx, "GetAgents")
defer task.End()

var resp *apiv1.GetAgentsResponse
var err error
trace.WithRegion(ctx, "resourceManager.GetAgents", func() {
resp, err = a.m.rm.GetAgents()
})
if err != nil {
return nil, err
}
Expand Down
17 changes: 12 additions & 5 deletions master/internal/api_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"runtime/trace"

"github.com/determined-ai/determined/master/internal/api"
"github.com/determined-ai/determined/master/internal/job/jobservice"
Expand Down Expand Up @@ -49,11 +50,17 @@ func (a *apiServer) GetJobs(
func (a *apiServer) GetJobsV2(
ctx context.Context, req *apiv1.GetJobsV2Request,
) (resp *apiv1.GetJobsV2Response, err error) {
jobs, err := jobservice.DefaultService.GetJobs(
rm.ResourcePoolName(req.ResourcePool),
req.OrderBy == apiv1.OrderBy_ORDER_BY_DESC,
req.States,
)
ctx, task := trace.NewTask(ctx, "GetJobsV2")
defer task.End()

var jobs []*jobv1.Job
trace.WithRegion(ctx, "jobservice.GetJobs", func() {
jobs, err = jobservice.DefaultService.GetJobs(
rm.ResourcePoolName(req.ResourcePool),
req.OrderBy == apiv1.OrderBy_ORDER_BY_DESC,
req.States,
)
})
if err != nil {
return nil, err
}
Expand Down
9 changes: 8 additions & 1 deletion master/internal/job/jobservice/jobservice.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package jobservice

import (
"context"
"database/sql"
"fmt"
"runtime/trace"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -96,10 +98,15 @@ func (s *Service) GetJobs(
desc bool,
states []jobv1.State,
) ([]*jobv1.Job, error) {
// waiting on this lock
s.mu.Lock()
defer s.mu.Unlock()

jobQ, err := s.rm.GetJobQ(resourcePool)
var jobQ map[model.JobID]*sproto.RMJobInfo
var err error
trace.WithRegion(context.Background(), "resourceManager.GetJobQ", func() {
jobQ, err = s.rm.GetJobQ(resourcePool)
})
if err != nil {
s.syslog.WithError(err).Error("getting job queue info from RM")
return nil, err
Expand Down
9 changes: 8 additions & 1 deletion master/internal/rm/kubernetesrm/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"os"
"path/filepath"
"runtime/trace"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -1049,9 +1050,15 @@ func (j *jobsService) refreshPodStates(allocationID model.AllocationID) error {
}

func (j *jobsService) GetAgents() (*apiv1.GetAgentsResponse, error) {
// waiting on this lock
j.mu.Lock()
defer j.mu.Unlock()
return j.getAgents()
var resp *apiv1.GetAgentsResponse
var err error
trace.WithRegion(context.Background(), "jobsService.GetAgents", func() {
resp, err = j.getAgents()
})
return resp, err
}

func (j *jobsService) GetAgent(msg *apiv1.GetAgentRequest) *apiv1.GetAgentResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"fmt"
"maps"
"runtime/trace"
"slices"
"time"

Expand Down Expand Up @@ -239,7 +240,10 @@ func (k *ResourceManager) GetJobQ(rpName rm.ResourcePoolName) (map[model.JobID]*
if err != nil {
return nil, err
}
resp := rp.GetJobQ()
var resp map[model.JobID]*sproto.RMJobInfo
trace.WithRegion(context.Background(), "resourcepool.GetJobQ", func() {
resp = rp.GetJobQ()
})
return resp, nil
}

Expand Down
8 changes: 7 additions & 1 deletion master/internal/rm/kubernetesrm/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"runtime/trace"
"sync"

"github.com/google/uuid"
Expand Down Expand Up @@ -127,11 +128,16 @@ func (k *kubernetesResourcePool) PendingPreemption(msg sproto.PendingPreemption)
}

func (k *kubernetesResourcePool) GetJobQ() map[model.JobID]*sproto.RMJobInfo {
// waiting on this lock
k.mu.Lock()
defer k.mu.Unlock()
k.tryAdmitPendingTasks = true

return k.jobQInfo()
var resp map[model.JobID]*sproto.RMJobInfo
trace.WithRegion(context.Background(), "resourcepool.k8s.jobQInfo", func() {
resp = k.jobQInfo()
})
return resp
}

func (k *kubernetesResourcePool) GetJobQStats() *jobv1.QueueStats {
Expand Down

0 comments on commit 8acaee5

Please sign in to comment.