Skip to content

Commit

Permalink
Add initial admin page for managing metastore admin nodes (#3743)
Browse files Browse the repository at this point in the history
* Add initial admin page for managing metastore admin nodes

* Improve field name

* Close metastore client connections

* Remove todo

* Update comment

* Move metastore admin ownership outside of metastore

* Move admin methods to RaftNodeService

* Run mockery, fix broken test

* Make node management operation more robust to failures

* Run make fmt

* Use api specific request/response types

* Add clarification in info tooltip
  • Loading branch information
aleks-p authored Dec 12, 2024
1 parent e10086d commit 2e75416
Show file tree
Hide file tree
Showing 37 changed files with 4,062 additions and 768 deletions.
7 changes: 0 additions & 7 deletions cmd/profilecli/static/bootstrap-5.1.3.bundle.min.js

This file was deleted.

7 changes: 0 additions & 7 deletions cmd/profilecli/static/bootstrap-5.1.3.min.css

This file was deleted.

7 changes: 7 additions & 0 deletions cmd/profilecli/static/bootstrap-5.3.3.bundle.min.js

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions cmd/profilecli/static/bootstrap-5.3.3.min.css

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions pkg/api/api_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
queryv1 "github.com/grafana/pyroscope/api/gen/proto/go/query/v1"
segmentwriterv1 "github.com/grafana/pyroscope/api/gen/proto/go/segmentwriter/v1"
segmentwriter "github.com/grafana/pyroscope/pkg/experiment/ingester"
metastoreadmin "github.com/grafana/pyroscope/pkg/experiment/metastore/admin"
querybackend "github.com/grafana/pyroscope/pkg/experiment/query_backend"
)

Expand All @@ -26,3 +27,10 @@ func (a *API) RegisterSegmentWriterRing(r http.Handler) {
func (a *API) RegisterQueryBackend(svc *querybackend.QueryBackend) {
queryv1.RegisterQueryBackendServiceServer(a.server.GRPC, svc)
}

func (a *API) RegisterMetastoreAdmin(adm *metastoreadmin.Admin) {
a.RegisterRoute("/metastore-nodes", adm.NodeListHandler(), false, true, "GET", "POST")
a.indexPage.AddLinks(defaultWeight, "Metastore", []IndexPageLink{
{Desc: "Nodes", Path: "/metastore-nodes"},
})
}
6 changes: 3 additions & 3 deletions pkg/api/index.gohtml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
{{- /*gotype: github.com/grafana/pyroscope/pkg/api.indexPageContents */ -}}
<!DOCTYPE html>
<html>
<html data-bs-theme="dark">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">

<title>Grafana Pyroscope</title>

<link rel="stylesheet" href="{{ AddPathPrefix "/static/bootstrap-5.1.3.min.css" }}">
<link rel="stylesheet" href="{{ AddPathPrefix "/static/bootstrap-5.3.3.min.css" }}">
<link rel="stylesheet" href="{{ AddPathPrefix "/static/pyroscope-styles.css" }}">
<script src="{{ AddPathPrefix "/static/bootstrap-5.1.3.bundle.min.js" }}"></script>
<script src="{{ AddPathPrefix "/static/bootstrap-5.3.3.bundle.min.js" }}"></script>
</head>
<body>
<div class="d-flex flex-column container py-3">
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/memberlist_status.gohtml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
{{- /*gotype: github.com/grafana/dskit/kv/memberlist.StatusPageData */ -}}
<!DOCTYPE html>
<html class="h-100">
<html class="h-100" data-bs-theme="dark">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">

<title>Memberlist: Grafana Pyroscope</title>

<link rel="stylesheet" href="{{ AddPathPrefix "/static/bootstrap-5.1.3.min.css" }}">
<link rel="stylesheet" href="{{ AddPathPrefix "/static/bootstrap-5.3.3.min.css" }}">
<link rel="stylesheet" href="{{ AddPathPrefix "/static/bootstrap-icons-1.8.1.css" }}">
<link rel="stylesheet" href="{{ AddPathPrefix "/static/pyroscope-styles.css" }}">
<script src="{{ AddPathPrefix "/static/bootstrap-5.1.3.bundle.min.js" }}"></script>
<script src="{{ AddPathPrefix "/static/bootstrap-5.3.3.bundle.min.js" }}"></script>
</head>
<body class="d-flex flex-column h-100">
<main class="flex-shrink-0">
Expand Down
7 changes: 0 additions & 7 deletions pkg/api/static/bootstrap-5.1.3.bundle.min.js

This file was deleted.

7 changes: 0 additions & 7 deletions pkg/api/static/bootstrap-5.1.3.min.css

This file was deleted.

7 changes: 7 additions & 0 deletions pkg/api/static/bootstrap-5.3.3.bundle.min.js

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pkg/api/static/bootstrap-5.3.3.min.css

Large diffs are not rendered by default.

241 changes: 241 additions & 0 deletions pkg/experiment/metastore/admin/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package admin

import (
"context"
"math"
"net/http"
"slices"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/grafana/pyroscope/pkg/experiment/metastore/discovery"
"github.com/grafana/pyroscope/pkg/experiment/metastore/raftnode/raftnodepb"
httputil "github.com/grafana/pyroscope/pkg/util/http"
)

type configChangeRequest struct {
serverId string
currentTerm uint64
}

type formActionHandler func(http.ResponseWriter, *http.Request, configChangeRequest) error

type Admin struct {
service services.Service

logger log.Logger

servers []discovery.Server
leaderClient raftnodepb.RaftNodeServiceClient

actionHandlers map[string]formActionHandler
}

func (a *Admin) Service() services.Service {
return a.service
}

type metastoreClient struct {
raftnodepb.RaftNodeServiceClient
conn *grpc.ClientConn
}

func New(
client raftnodepb.RaftNodeServiceClient,
logger log.Logger,
metastoreAddress string,
) (*Admin, error) {
adm := &Admin{
leaderClient: client,
logger: logger,
actionHandlers: make(map[string]formActionHandler),
}
adm.addFormActionHandlers()
adm.service = services.NewIdleService(adm.starting, adm.stopping)

disc, err := discovery.NewDiscovery(logger, metastoreAddress, nil)
if err != nil {
return nil, err
}
disc.Subscribe(adm)

return adm, nil
}

func (a *Admin) starting(context.Context) error { return nil }
func (a *Admin) stopping(error) error { return nil }

func (a *Admin) Servers(servers []discovery.Server) {
a.servers = servers
slices.SortFunc(a.servers, func(a, b discovery.Server) int {
return strings.Compare(string(a.Raft.ID), string(b.Raft.ID))
})
}

func (a *Admin) NodeListHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
for fieldName, handler := range a.actionHandlers {
field := r.FormValue(fieldName)
if field != "" {
changeRequest := configChangeRequest{
serverId: field,
}
if currentTerm := r.FormValue("current-term"); currentTerm != "" {
parsedTerm, err := strconv.ParseUint(currentTerm, 10, 64)
if err == nil {
changeRequest.currentTerm = parsedTerm
}
}
if err := handler(w, r, changeRequest); err != nil {
httputil.Error(w, err)
return
}
w.Header().Set("Location", "#")
w.WriteHeader(http.StatusFound)
return
}
}
}

raftState, err := a.fetchRaftState(r.Context())
if err != nil {
httputil.Error(w, err)
return
}

err = pageTemplates.nodesTemplate.Execute(w, nodesPageContent{
DiscoveredServers: a.servers,
Raft: raftState,
Now: time.Now().UTC(),
})
if err != nil {
httputil.Error(w, err)
}
})
}

func (a *Admin) fetchRaftState(ctx context.Context) (*raftNodeState, error) {
observedLeaders := make(map[string]int)
numRaftNodes := 0
nodes := make([]*metastoreNode, 0, len(a.servers))

for _, s := range a.servers {
cl, err := newClient(s.ResolvedAddress)
if err != nil {
level.Warn(a.logger).Log("msg", "missing client for server", "server", s)
continue
}
node := &metastoreNode{
DiscoveryServerId: string(s.Raft.ID),
ResolvedAddress: s.ResolvedAddress,
}
nodes = append(nodes, node)

res, err := cl.NodeInfo(ctx, &raftnodepb.NodeInfoRequest{})
_ = cl.conn.Close()

if err != nil {
level.Warn(a.logger).Log("msg", "error fetching node info", "server", s, "err", err)
continue
}
nInfo := res.Node

node.RaftServerId = nInfo.ServerId
node.Member = nInfo.LeaderId != ""
node.State = nInfo.State
node.LeaderId = nInfo.LeaderId
node.ConfigIndex = nInfo.ConfigurationIndex
node.NumPeers = len(nInfo.Peers)
node.CurrentTerm = nInfo.CurrentTerm
node.LastIndex = nInfo.LastIndex
node.CommitIndex = nInfo.CommitIndex
node.AppliedIndex = nInfo.AppliedIndex
node.BuildVersion = nInfo.BuildVersion
node.BuildRevision = nInfo.BuildRevision
node.Stats = make(map[string]string)
for i, n := range nInfo.Stats.Name {
node.Stats[n] = nInfo.Stats.Value[i]
}

if node.Member {
numRaftNodes++
observedLeaders[node.LeaderId]++
}
}

currentTerm := findCurrentTerm(nodes)

return &raftNodeState{
Nodes: nodes,
ObservedLeaders: observedLeaders,
CurrentTerm: currentTerm,
NumNodes: numRaftNodes,
}, nil
}

func findCurrentTerm(nodes []*metastoreNode) uint64 {
terms := make(map[uint64]int)
for _, node := range nodes {
if node.Member {
terms[node.CurrentTerm]++
}
}
// TODO aleks-p: in case of a mismatch in reported current terms, we bypass any validation
term := uint64(math.MaxUint64)
if len(terms) == 1 {
for k := range terms {
term = k
}
}
return term
}

func newClient(address string) (*metastoreClient, error) {
conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
return &metastoreClient{
RaftNodeServiceClient: raftnodepb.NewRaftNodeServiceClient(conn),
conn: conn,
}, nil
}

func (a *Admin) addFormActionHandlers() {
a.actionHandlers["add"] = func(w http.ResponseWriter, r *http.Request, cr configChangeRequest) error {
_, err := a.leaderClient.AddNode(r.Context(), &raftnodepb.AddNodeRequest{
ServerId: cr.serverId,
CurrentTerm: cr.currentTerm,
})
return err
}
a.actionHandlers["remove"] = func(w http.ResponseWriter, r *http.Request, cr configChangeRequest) error {
_, err := a.leaderClient.RemoveNode(r.Context(), &raftnodepb.RemoveNodeRequest{
ServerId: cr.serverId,
CurrentTerm: cr.currentTerm,
})
return err
}
a.actionHandlers["promote"] = func(w http.ResponseWriter, r *http.Request, cr configChangeRequest) error {
_, err := a.leaderClient.PromoteToLeader(r.Context(), &raftnodepb.PromoteToLeaderRequest{
ServerId: cr.serverId,
CurrentTerm: cr.currentTerm,
})
return err
}
a.actionHandlers["demote"] = func(w http.ResponseWriter, r *http.Request, cr configChangeRequest) error {
_, err := a.leaderClient.DemoteLeader(r.Context(), &raftnodepb.DemoteLeaderRequest{
ServerId: cr.serverId,
CurrentTerm: cr.currentTerm,
})
return err
}
}
Loading

0 comments on commit 2e75416

Please sign in to comment.