Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose cluster usage metric from server. #309

Merged
merged 2 commits into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 78 additions & 16 deletions internal/armada/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,30 @@ import (
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"

"github.com/G-Research/armada/internal/armada/api"
"github.com/G-Research/armada/internal/armada/repository"
"github.com/G-Research/armada/internal/armada/scheduling"
"github.com/G-Research/armada/internal/common"
)

const metricPrefix = "armada_"

func ExposeDataMetrics(queueRepository repository.QueueRepository, jobRepository repository.JobRepository) *QueueInfoCollector {
collector := &QueueInfoCollector{queueRepository, jobRepository, map[*api.Queue]scheduling.QueuePriorityInfo{}}
func ExposeDataMetrics(
queueRepository repository.QueueRepository,
jobRepository repository.JobRepository,
usageRepository repository.UsageRepository,
) *QueueInfoCollector {
collector := &QueueInfoCollector{
queueRepository,
jobRepository,
usageRepository}
prometheus.MustRegister(collector)
return collector
}

type MetricRecorder interface {
RecordQueuePriorities(priorities map[*api.Queue]scheduling.QueuePriorityInfo)
}

type QueueInfoCollector struct {
queueRepository repository.QueueRepository
jobRepository repository.JobRepository
priorities map[*api.Queue]scheduling.QueuePriorityInfo
usageRepository repository.UsageRepository
}

var queueSizeDesc = prometheus.NewDesc(
Expand All @@ -41,9 +44,19 @@ var queuePriorityDesc = prometheus.NewDesc(
nil,
)

func (c *QueueInfoCollector) RecordQueuePriorities(priorities map[*api.Queue]scheduling.QueuePriorityInfo) {
c.priorities = priorities
}
var queueUsageDesc = prometheus.NewDesc(
metricPrefix+"queue_resource_usage",
"Resource usage of a queue",
[]string{"cluster", "queueName", "resourceType"},
nil,
)

var clusterCapacityDesc = prometheus.NewDesc(
metricPrefix+"cluster_capacity",
"Cluster capacity",
[]string{"cluster", "resourceType"},
nil,
)

func (c *QueueInfoCollector) Describe(desc chan<- *prometheus.Desc) {
desc <- queueSizeDesc
Expand All @@ -52,21 +65,70 @@ func (c *QueueInfoCollector) Describe(desc chan<- *prometheus.Desc) {

func (c *QueueInfoCollector) Collect(metrics chan<- prometheus.Metric) {

for queue, priority := range c.priorities {
metrics <- prometheus.MustNewConstMetric(queuePriorityDesc, prometheus.GaugeValue, priority.Priority, queue.Name)
queues, e := c.queueRepository.GetAllQueues()
if e != nil {
log.Errorf("Error while getting queue metrics %s", e)
recordInvalidMetrics(metrics, e)
return
}

queues := scheduling.GetPriorityMapQueues(c.priorities)

queueSizes, e := c.jobRepository.GetQueueSizes(queues)
if e != nil {
log.Errorf("Error while getting queue size metrics %s", e)
metrics <- prometheus.NewInvalidMetric(queueSizeDesc, e)
recordInvalidMetrics(metrics, e)
return
}

usageReports, e := c.usageRepository.GetClusterUsageReports()
if e != nil {
log.Errorf("Error while getting queue usage metrics %s", e)
recordInvalidMetrics(metrics, e)
return
}

activeClusterReports := scheduling.FilterActiveClusters(usageReports)
clusterPriorities, e := c.usageRepository.GetClusterPriorities(scheduling.GetClusterReportIds(activeClusterReports))
if e != nil {
log.Errorf("Error while getting queue priority metrics %s", e)
recordInvalidMetrics(metrics, e)
return
}

queuePriority := scheduling.CalculateQueuesPriorityInfo(clusterPriorities, activeClusterReports, queues)

for queue, priority := range queuePriority {
metrics <- prometheus.MustNewConstMetric(queuePriorityDesc, prometheus.GaugeValue, priority.Priority, queue.Name)
}

for i, q := range queues {
metrics <- prometheus.MustNewConstMetric(queueSizeDesc, prometheus.GaugeValue, float64(queueSizes[i]), q.Name)
}

for cluster, report := range activeClusterReports {
for _, queueReport := range report.Queues {
for resourceType, value := range queueReport.Resources {
metrics <- prometheus.MustNewConstMetric(
queueUsageDesc,
prometheus.GaugeValue,
common.QuantityAsFloat64(value),
cluster,
queueReport.Name,
resourceType)
}
}
for resourceType, value := range report.ClusterCapacity {
metrics <- prometheus.MustNewConstMetric(
clusterCapacityDesc,
prometheus.GaugeValue,
common.QuantityAsFloat64(value),
cluster,
resourceType)
}
}
}

func recordInvalidMetrics(metrics chan<- prometheus.Metric, e error) {
metrics <- prometheus.NewInvalidMetric(queueSizeDesc, e)
metrics <- prometheus.NewInvalidMetric(queuePriorityDesc, e)
metrics <- prometheus.NewInvalidMetric(queueUsageDesc, e)
}
6 changes: 4 additions & 2 deletions internal/armada/scheduling/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"github.com/G-Research/armada/internal/armada/api"
)

func FilterActiveClusters(reports map[string]*api.ClusterUsageReport, expiry time.Duration) map[string]*api.ClusterUsageReport {
const activeClusterExpiry = 10 * time.Minute

func FilterActiveClusters(reports map[string]*api.ClusterUsageReport) map[string]*api.ClusterUsageReport {
result := map[string]*api.ClusterUsageReport{}
now := time.Now()
for id, report := range reports {
if report.ReportTime.Add(expiry).After(now) {
if report.ReportTime.Add(activeClusterExpiry).After(now) {
result[id] = report
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/armada/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,20 @@ func Serve(config *configuration.ArmadaConfig) (*grpc.Server, *sync.WaitGroup) {

eventRepository := repository.NewRedisEventRepository(eventsDb, config.EventRetention)

metricsRecorder := metrics.ExposeDataMetrics(queueRepository, jobRepository)

permissions := authorization.NewPrincipalPermissionChecker(config.PermissionGroupMapping, config.PermissionScopeMapping)

submitServer := server.NewSubmitServer(permissions, jobRepository, queueRepository, eventRepository)
usageServer := server.NewUsageServer(permissions, config.PriorityHalfTime, usageRepository)
aggregatedQueueServer := server.NewAggregatedQueueServer(permissions, config.Scheduling, jobRepository, queueRepository, usageRepository, eventRepository, metricsRecorder)
aggregatedQueueServer := server.NewAggregatedQueueServer(permissions, config.Scheduling, jobRepository, queueRepository, usageRepository, eventRepository)
eventServer := server.NewEventServer(permissions, eventRepository)

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.GrpcPort))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

metrics.ExposeDataMetrics(queueRepository, jobRepository, usageRepository)

api.RegisterSubmitServer(grpcServer, submitServer)
api.RegisterUsageServer(grpcServer, usageServer)
api.RegisterAggregatedQueueServer(grpcServer, aggregatedQueueServer)
Expand Down
10 changes: 2 additions & 8 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/G-Research/armada/internal/armada/authorization"
"github.com/G-Research/armada/internal/armada/authorization/permissions"
"github.com/G-Research/armada/internal/armada/configuration"
"github.com/G-Research/armada/internal/armada/metrics"
"github.com/G-Research/armada/internal/armada/repository"
"github.com/G-Research/armada/internal/armada/scheduling"
"github.com/G-Research/armada/internal/common"
Expand All @@ -30,7 +29,6 @@ type AggregatedQueueServer struct {
queueRepository repository.QueueRepository
usageRepository repository.UsageRepository
eventRepository repository.EventRepository
metricRecorder metrics.MetricRecorder
}

func NewAggregatedQueueServer(
Expand All @@ -40,16 +38,14 @@ func NewAggregatedQueueServer(
queueRepository repository.QueueRepository,
usageRepository repository.UsageRepository,
eventRepository repository.EventRepository,
metricRecorder metrics.MetricRecorder,
) *AggregatedQueueServer {
return &AggregatedQueueServer{
permissions: permissions,
schedulingConfig: schedulingConfig,
jobRepository: jobRepository,
queueRepository: queueRepository,
usageRepository: usageRepository,
eventRepository: eventRepository,
metricRecorder: metricRecorder}
eventRepository: eventRepository}
}

type leaseContext struct {
Expand Down Expand Up @@ -89,7 +85,7 @@ func (q AggregatedQueueServer) LeaseJobs(ctx context.Context, request *api.Lease
return nil, e
}

activeClusterReports := scheduling.FilterActiveClusters(usageReports, 10*time.Minute)
activeClusterReports := scheduling.FilterActiveClusters(usageReports)
clusterPriorities, e := q.usageRepository.GetClusterPriorities(scheduling.GetClusterReportIds(activeClusterReports))
if e != nil {
return nil, e
Expand All @@ -107,8 +103,6 @@ func (q AggregatedQueueServer) LeaseJobs(ctx context.Context, request *api.Lease
activeQueuePriority := filterPriorityMapByKeys(queuePriority, activeQueues)
slices := scheduling.SliceResource(scarcity, activeQueuePriority, resourcesToSchedule)

q.metricRecorder.RecordQueuePriorities(queuePriority)

jobs := []*api.Job{}
limit := maxJobsPerLease

Expand Down