From bd085744edc0840b77eb526c8322b343dff42f74 Mon Sep 17 00:00:00 2001 From: Jan Kaspar <2270833+jankaspar@users.noreply.github.com> Date: Wed, 29 Jan 2020 14:58:31 +0000 Subject: [PATCH 1/2] Expose cluster usage metric from server. --- internal/armada/metrics/metrics.go | 94 +++++++++++++++++++++----- internal/armada/scheduling/clusters.go | 6 +- internal/armada/server.go | 6 +- internal/armada/server/lease.go | 10 +-- 4 files changed, 87 insertions(+), 29 deletions(-) diff --git a/internal/armada/metrics/metrics.go b/internal/armada/metrics/metrics.go index 17f089d188d..3d592a0420a 100644 --- a/internal/armada/metrics/metrics.go +++ b/internal/armada/metrics/metrics.go @@ -4,27 +4,30 @@ import ( "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" - "github.com/G-Research/armada/internal/armada/api" "github.com/G-Research/armada/internal/armada/repository" "github.com/G-Research/armada/internal/armada/scheduling" + "github.com/G-Research/armada/internal/common" ) const metricPrefix = "armada_" -func ExposeDataMetrics(queueRepository repository.QueueRepository, jobRepository repository.JobRepository) *QueueInfoCollector { - collector := &QueueInfoCollector{queueRepository, jobRepository, map[*api.Queue]scheduling.QueuePriorityInfo{}} +func ExposeDataMetrics( + queueRepository repository.QueueRepository, + jobRepository repository.JobRepository, + usageRepository repository.UsageRepository, +) *QueueInfoCollector { + collector := &QueueInfoCollector{ + queueRepository, + jobRepository, + usageRepository} prometheus.MustRegister(collector) return collector } -type MetricRecorder interface { - RecordQueuePriorities(priorities map[*api.Queue]scheduling.QueuePriorityInfo) -} - type QueueInfoCollector struct { queueRepository repository.QueueRepository jobRepository repository.JobRepository - priorities map[*api.Queue]scheduling.QueuePriorityInfo + usageRepository repository.UsageRepository } var queueSizeDesc = prometheus.NewDesc( @@ -41,9 +44,19 @@ var queuePriorityDesc = prometheus.NewDesc( nil, ) -func (c *QueueInfoCollector) RecordQueuePriorities(priorities map[*api.Queue]scheduling.QueuePriorityInfo) { - c.priorities = priorities -} +var queueUsageDesc = prometheus.NewDesc( + metricPrefix+"queue_resource_usage", + "Resource usage of a queue", + []string{"cluster", "queueName", "resourceType"}, + nil, +) + +var clusterCapacityDesc = prometheus.NewDesc( + metricPrefix+"cluster_capacity", + "Resource usage of a queue", + []string{"cluster", "resourceType"}, + nil, +) func (c *QueueInfoCollector) Describe(desc chan<- *prometheus.Desc) { desc <- queueSizeDesc @@ -52,21 +65,70 @@ func (c *QueueInfoCollector) Describe(desc chan<- *prometheus.Desc) { func (c *QueueInfoCollector) Collect(metrics chan<- prometheus.Metric) { - for queue, priority := range c.priorities { - metrics <- prometheus.MustNewConstMetric(queuePriorityDesc, prometheus.GaugeValue, priority.Priority, queue.Name) + queues, e := c.queueRepository.GetAllQueues() + if e != nil { + log.Errorf("Error while getting queue metrics %s", e) + recordInvalidMetrics(metrics, e) + return } - queues := scheduling.GetPriorityMapQueues(c.priorities) - queueSizes, e := c.jobRepository.GetQueueSizes(queues) if e != nil { log.Errorf("Error while getting queue size metrics %s", e) - metrics <- prometheus.NewInvalidMetric(queueSizeDesc, e) + recordInvalidMetrics(metrics, e) return } + usageReports, e := c.usageRepository.GetClusterUsageReports() + if e != nil { + log.Errorf("Error while getting queue usage metrics %s", e) + recordInvalidMetrics(metrics, e) + return + } + + activeClusterReports := scheduling.FilterActiveClusters(usageReports) + clusterPriorities, e := c.usageRepository.GetClusterPriorities(scheduling.GetClusterReportIds(activeClusterReports)) + if e != nil { + log.Errorf("Error while getting queue priority metrics %s", e) + recordInvalidMetrics(metrics, e) + return + } + + queuePriority := scheduling.CalculateQueuesPriorityInfo(clusterPriorities, activeClusterReports, queues) + + for queue, priority := range queuePriority { + metrics <- prometheus.MustNewConstMetric(queuePriorityDesc, prometheus.GaugeValue, priority.Priority, queue.Name) + } + for i, q := range queues { metrics <- prometheus.MustNewConstMetric(queueSizeDesc, prometheus.GaugeValue, float64(queueSizes[i]), q.Name) + } + for cluster, report := range activeClusterReports { + for _, queueReport := range report.Queues { + for resourceType, value := range queueReport.Resources { + metrics <- prometheus.MustNewConstMetric( + queueUsageDesc, + prometheus.GaugeValue, + common.QuantityAsFloat64(value), + cluster, + queueReport.Name, + resourceType) + } + } + for resourceType, value := range report.ClusterCapacity { + metrics <- prometheus.MustNewConstMetric( + clusterCapacityDesc, + prometheus.GaugeValue, + common.QuantityAsFloat64(value), + cluster, + resourceType) + } } } + +func recordInvalidMetrics(metrics chan<- prometheus.Metric, e error) { + metrics <- prometheus.NewInvalidMetric(queueSizeDesc, e) + metrics <- prometheus.NewInvalidMetric(queuePriorityDesc, e) + metrics <- prometheus.NewInvalidMetric(queueUsageDesc, e) +} diff --git a/internal/armada/scheduling/clusters.go b/internal/armada/scheduling/clusters.go index 4dddf884246..f674f0e599d 100644 --- a/internal/armada/scheduling/clusters.go +++ b/internal/armada/scheduling/clusters.go @@ -6,11 +6,13 @@ import ( "github.com/G-Research/armada/internal/armada/api" ) -func FilterActiveClusters(reports map[string]*api.ClusterUsageReport, expiry time.Duration) map[string]*api.ClusterUsageReport { +const activeClusterExpiry = 10 * time.Minute + +func FilterActiveClusters(reports map[string]*api.ClusterUsageReport) map[string]*api.ClusterUsageReport { result := map[string]*api.ClusterUsageReport{} now := time.Now() for id, report := range reports { - if report.ReportTime.Add(expiry).After(now) { + if report.ReportTime.Add(activeClusterExpiry).After(now) { result[id] = report } } diff --git a/internal/armada/server.go b/internal/armada/server.go index 0ec1118503e..5539937d4d4 100644 --- a/internal/armada/server.go +++ b/internal/armada/server.go @@ -39,13 +39,11 @@ func Serve(config *configuration.ArmadaConfig) (*grpc.Server, *sync.WaitGroup) { eventRepository := repository.NewRedisEventRepository(eventsDb, config.EventRetention) - metricsRecorder := metrics.ExposeDataMetrics(queueRepository, jobRepository) - permissions := authorization.NewPrincipalPermissionChecker(config.PermissionGroupMapping, config.PermissionScopeMapping) submitServer := server.NewSubmitServer(permissions, jobRepository, queueRepository, eventRepository) usageServer := server.NewUsageServer(permissions, config.PriorityHalfTime, usageRepository) - aggregatedQueueServer := server.NewAggregatedQueueServer(permissions, config.Scheduling, jobRepository, queueRepository, usageRepository, eventRepository, metricsRecorder) + aggregatedQueueServer := server.NewAggregatedQueueServer(permissions, config.Scheduling, jobRepository, queueRepository, usageRepository, eventRepository) eventServer := server.NewEventServer(permissions, eventRepository) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.GrpcPort)) @@ -53,6 +51,8 @@ func Serve(config *configuration.ArmadaConfig) (*grpc.Server, *sync.WaitGroup) { log.Fatalf("failed to listen: %v", err) } + metrics.ExposeDataMetrics(queueRepository, jobRepository, usageRepository) + api.RegisterSubmitServer(grpcServer, submitServer) api.RegisterUsageServer(grpcServer, usageServer) api.RegisterAggregatedQueueServer(grpcServer, aggregatedQueueServer) diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index bd3774b1260..de5a3315974 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -15,7 +15,6 @@ import ( "github.com/G-Research/armada/internal/armada/authorization" "github.com/G-Research/armada/internal/armada/authorization/permissions" "github.com/G-Research/armada/internal/armada/configuration" - "github.com/G-Research/armada/internal/armada/metrics" "github.com/G-Research/armada/internal/armada/repository" "github.com/G-Research/armada/internal/armada/scheduling" "github.com/G-Research/armada/internal/common" @@ -30,7 +29,6 @@ type AggregatedQueueServer struct { queueRepository repository.QueueRepository usageRepository repository.UsageRepository eventRepository repository.EventRepository - metricRecorder metrics.MetricRecorder } func NewAggregatedQueueServer( @@ -40,7 +38,6 @@ func NewAggregatedQueueServer( queueRepository repository.QueueRepository, usageRepository repository.UsageRepository, eventRepository repository.EventRepository, - metricRecorder metrics.MetricRecorder, ) *AggregatedQueueServer { return &AggregatedQueueServer{ permissions: permissions, @@ -48,8 +45,7 @@ func NewAggregatedQueueServer( jobRepository: jobRepository, queueRepository: queueRepository, usageRepository: usageRepository, - eventRepository: eventRepository, - metricRecorder: metricRecorder} + eventRepository: eventRepository} } type leaseContext struct { @@ -89,7 +85,7 @@ func (q AggregatedQueueServer) LeaseJobs(ctx context.Context, request *api.Lease return nil, e } - activeClusterReports := scheduling.FilterActiveClusters(usageReports, 10*time.Minute) + activeClusterReports := scheduling.FilterActiveClusters(usageReports) clusterPriorities, e := q.usageRepository.GetClusterPriorities(scheduling.GetClusterReportIds(activeClusterReports)) if e != nil { return nil, e @@ -107,8 +103,6 @@ func (q AggregatedQueueServer) LeaseJobs(ctx context.Context, request *api.Lease activeQueuePriority := filterPriorityMapByKeys(queuePriority, activeQueues) slices := scheduling.SliceResource(scarcity, activeQueuePriority, resourcesToSchedule) - q.metricRecorder.RecordQueuePriorities(queuePriority) - jobs := []*api.Job{} limit := maxJobsPerLease From b7a99644cbf3c162d874ed77d2c7e912a764ae48 Mon Sep 17 00:00:00 2001 From: jankaspar <2270833+jankaspar@users.noreply.github.com> Date: Wed, 29 Jan 2020 15:10:09 +0000 Subject: [PATCH 2/2] Update internal/armada/metrics/metrics.go --- internal/armada/metrics/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/armada/metrics/metrics.go b/internal/armada/metrics/metrics.go index 3d592a0420a..560e799b8a3 100644 --- a/internal/armada/metrics/metrics.go +++ b/internal/armada/metrics/metrics.go @@ -53,7 +53,7 @@ var queueUsageDesc = prometheus.NewDesc( var clusterCapacityDesc = prometheus.NewDesc( metricPrefix+"cluster_capacity", - "Resource usage of a queue", + "Cluster capacity", []string{"cluster", "resourceType"}, nil, )