Skip to content

Commit

Permalink
Adds microceph client configuration support for rbd_cache
Browse files Browse the repository at this point in the history
Signed-off-by: Utkarsh Bhatt <[email protected]>
  • Loading branch information
UtkarshBhatthere committed Oct 5, 2023
1 parent b975e5b commit b1a0401
Show file tree
Hide file tree
Showing 24 changed files with 1,501 additions and 57 deletions.
179 changes: 179 additions & 0 deletions microceph/api/client_configs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package api

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/http"

"github.com/canonical/lxd/lxd/response"
"github.com/canonical/lxd/shared/logger"
"github.com/canonical/microceph/microceph/api/types"
"github.com/canonical/microceph/microceph/ceph"
"github.com/canonical/microceph/microceph/client"
"github.com/canonical/microceph/microceph/common"
"github.com/canonical/microceph/microceph/database"
"github.com/canonical/microcluster/rest"
"github.com/canonical/microcluster/state"
)

// Top level client API
var clientCmd = rest.Endpoint{
Path: "client",
Get: rest.EndpointAction{Handler: cmdClientGet, ProxyTarget: true},
}

func cmdClientGet(s *state.State, r *http.Request) response.Response {
return response.EmptySyncResponse
}

// client configs API
var clientConfigsCmd = rest.Endpoint{
Path: "client/configs",
Put: rest.EndpointAction{Handler: cmdClientConfigsPut, ProxyTarget: true},
Get: rest.EndpointAction{Handler: cmdClientConfigsGet, ProxyTarget: true},
}

func cmdClientConfigsGet(s *state.State, r *http.Request) response.Response {
var req types.ClientConfig
var configs database.ClientConfigItems

err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
return response.InternalError(err)
}

if len(req.Host) > 0 {
configs, err = database.ClientConfigQuery.GetAllForHost(s, req.Host)
} else {
configs, err = database.ClientConfigQuery.GetAll(s)
}
if err != nil {
logger.Errorf("failed fetching client configs: %v for %v", err, req)
return response.SyncResponse(false, nil)
}

return response.SyncResponse(true, configs.GetClientConfigSlice())
}

// Implements the render .conf file at that particular host.
func cmdClientConfigsPut(s *state.State, r *http.Request) response.Response {
// Check if microceph is bootstrapped.
err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
isFsid, err := database.ConfigItemExists(ctx, tx, "fsid")
if err != nil || !isFsid {
return fmt.Errorf("cluster is not bootstrapped yet: %v", err)
}
return nil
})
if err != nil {
logger.Error(err.Error())
return response.SyncResponse(false, nil)
}

err = ceph.UpdateConfig(common.CephState{State: s})
if err != nil {
logger.Error(err.Error())
return response.SyncResponse(false, nil)
}

return response.EmptySyncResponse
}

// client configs key API
var clientConfigsKeyCmd = rest.Endpoint{
Path: "client/configs/{key}",
Put: rest.EndpointAction{Handler: clientConfigsKeyPut, ProxyTarget: true},
Get: rest.EndpointAction{Handler: clientConfigsKeyGet, ProxyTarget: true},
Delete: rest.EndpointAction{Handler: clientConfigsKeyDelete, ProxyTarget: true},
}

func clientConfigsKeyGet(s *state.State, r *http.Request) response.Response {
var req types.ClientConfig
var configs database.ClientConfigItems

err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
return response.InternalError(err)
}

if len(req.Host) > 0 {
configs, err = database.ClientConfigQuery.GetAllForKeyAndHost(s, req.Key, req.Host)
} else {
configs, err = database.ClientConfigQuery.GetAllForKey(s, req.Key)
}
if err != nil {
logger.Errorf("failed fetching client configs: %v for %v", err, req)
return response.SyncResponse(false, nil)
}

return response.SyncResponse(true, configs.GetClientConfigSlice())
}

func clientConfigsKeyPut(s *state.State, r *http.Request) response.Response {
var req types.ClientConfig

err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
return response.InternalError(err)
}

// If new config request is for global configuration.
err = database.ClientConfigQuery.AddNew(s, req.Key, req.Value, req.Host)
if err != nil {
return response.InternalError(err)
}

// Trigger /conf file update across cluster.
clientConfigUpdate(s, req.Wait)

return response.EmptySyncResponse
}

func clientConfigsKeyDelete(s *state.State, r *http.Request) response.Response {
var req types.ClientConfig

err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
return response.InternalError(err)
}

if len(req.Host) > 0 {
err = database.ClientConfigQuery.RemoveOneForKeyAndHost(s, req.Key, req.Host)
if err != nil {
return response.InternalError(err)
}
} else {
err = database.ClientConfigQuery.RemoveAllForKey(s, req.Key)
if err != nil {
return response.InternalError(err)
}
}

return response.EmptySyncResponse
}

// Perform ordered (one after other) updation of ceph.conf across the ceph cluster.
func clientConfigUpdate(s *state.State, wait bool) error {
if wait {
// Execute update conf synchronously
err := client.SendUpdateClientConfRequestToClusterMembers(common.CephState{State: s})
if err != nil {
return err
}

// Update on current host.
err = ceph.UpdateConfig(common.CephState{State: s})
if err != nil {
return err
}
} else { // Execute update asynchronously
go func() {
client.SendUpdateClientConfRequestToClusterMembers(common.CephState{State: s})
ceph.UpdateConfig(common.CephState{State: s}) // Restart on current host.
}()
}

return nil
}
3 changes: 3 additions & 0 deletions microceph/api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ var Endpoints = []rest.Endpoint{
mgrServiceCmd,
monServiceCmd,
rgwServiceCmd,
clientCmd,
clientConfigsCmd,
clientConfigsKeyCmd,
}
12 changes: 12 additions & 0 deletions microceph/api/types/client_configs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package types

// Configs holds the key value pair
type ClientConfig struct {
Key string `json:"key" yaml:"key"`
Value string `json:"value" yaml:"value"`
Host string `json:"host" yaml:"host"`
Wait bool `json:"wait" yaml:"wait"`
}

// Configs is a slice of configs
type ClientConfigs []ClientConfig
2 changes: 1 addition & 1 deletion microceph/ceph/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func Bootstrap(s common.StateInterface) error {
}

// Re-generate the configuration from the database.
err = updateConfig(s)
err = UpdateConfig(s)
if err != nil {
return fmt.Errorf("Failed to re-generate the configuration: %w", err)
}
Expand Down
58 changes: 58 additions & 0 deletions microceph/ceph/client_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package ceph

import (
"fmt"
"reflect"

"github.com/canonical/microceph/microceph/common"
"github.com/canonical/microceph/microceph/database"
)

type ClientConfigT struct {
isCache string
cacheSize string
isCacheWritethrough string
cacheMaxDirty string
cacheTargetDirty string
}

func GetClientConfigForHost(s common.StateInterface, hostname string) (ClientConfigT, error) {
retval := ClientConfigT{}

// Get all client configs for the current host.
configs, err := database.ClientConfigQuery.GetAllForHost(s.ClusterState(), hostname)
if err != nil {
return ClientConfigT{}, fmt.Errorf("could not query database for client configs: %v", err)
}

for _, config := range configs {
// Populate client config table using the database values.
setterTable := GetClientConfigSet()
err = setFieldValue(&retval, fmt.Sprint(setterTable[config.Key]), config.Value)
if err != nil {
return ClientConfigT{}, fmt.Errorf("cailed object population: %v", err)
}
}

return retval, nil
}

func setFieldValue(ogp *ClientConfigT, field string, value string) error {
r := reflect.ValueOf(ogp)
f := reflect.Indirect(r).FieldByName(field)
if f.Kind() != reflect.Invalid {
f.SetString(value)
return nil
}
return fmt.Errorf("cannot set field %s.", field)
}

func GetClientConfigSet() Set {
return Set{
"rbd_cache": "isCache",
"rbd_cache_size": "cacheSize",
"rbd_cache_writethrough_until_flush": "isCacheWritethrough",
"rbd_cache_max_dirty": "cacheMaxDirty",
"rbd_cache_target_dirty": "cacheTargetDirty",
}
}
26 changes: 19 additions & 7 deletions microceph/ceph/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"strings"

"github.com/canonical/lxd/shared/logger"
"github.com/canonical/microceph/microceph/api/types"
"github.com/canonical/microceph/microceph/common"
"github.com/canonical/microceph/microceph/database"
Expand Down Expand Up @@ -162,7 +163,7 @@ func ListConfigs() (types.Configs, error) {
}

// updates the ceph config file.
func updateConfig(s common.StateInterface) error {
func UpdateConfig(s common.StateInterface) error {
confPath := filepath.Join(os.Getenv("SNAP_DATA"), "conf")
runPath := filepath.Join(os.Getenv("SNAP_DATA"), "run")

Expand Down Expand Up @@ -207,14 +208,25 @@ func updateConfig(s common.StateInterface) error {

conf := newCephConfig(confPath)
address := s.ClusterState().Address().Hostname()
clientConfig, err := GetClientConfigForHost(s, s.ClusterState().Name())
if err != nil {
logger.Errorf("Failed to pull Client Configurations: %v", err)
return err
}

err = conf.WriteConfig(
map[string]any{
"fsid": config["fsid"],
"runDir": runPath,
"monitors": strings.Join(monitorAddresses, ","),
"addr": address,
"ipv4": strings.Contains(address, "."),
"ipv6": strings.Contains(address, ":"),
"fsid": config["fsid"],
"runDir": runPath,
"monitors": strings.Join(monitorAddresses, ","),
"addr": address,
"ipv4": strings.Contains(address, "."),
"ipv6": strings.Contains(address, ":"),
"isCache": clientConfig.isCache,
"cacheSize": clientConfig.cacheSize,
"isCacheWritethrough": clientConfig.isCacheWritethrough,
"cacheMaxDirty": clientConfig.cacheMaxDirty,
"cacheTargetDirty": clientConfig.cacheTargetDirty,
},
)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions microceph/ceph/configwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ auth allow insecure global id reclaim = false
public addr = {{.addr}}
ms bind ipv4 = {{.ipv4}}
ms bind ipv6 = {{.ipv6}}
[client]
rbd_cache = {{.isCache}}
rbd_cache_size = {{.cacheSize}}
rbd_cache_writethrough_until_flush = {{.isCacheWritethrough}}
rbd_cache_max_dirty = {{.cacheMaxDirty}}
rbd_cache_target_dirty = {{.cacheTargetDirty}}
`)),
configFile: "ceph.conf",
configDir: configDir,
Expand Down
2 changes: 1 addition & 1 deletion microceph/ceph/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func Join(s common.StateInterface) error {
}

// Generate the configuration from the database.
err := updateConfig(s)
err := UpdateConfig(s)
if err != nil {
return fmt.Errorf("Failed to generate the configuration: %w", err)
}
Expand Down
14 changes: 12 additions & 2 deletions microceph/ceph/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"fmt"
"github.com/canonical/microceph/microceph/common"
"os"
"path/filepath"
"time"
Expand All @@ -16,11 +15,22 @@ import (
"github.com/canonical/microcluster/state"

"github.com/canonical/microceph/microceph/api/types"
"github.com/canonical/microceph/microceph/common"
"github.com/canonical/microceph/microceph/database"
"github.com/tidwall/gjson"
)

type Set map[string]struct{}
type Set map[string]interface{}

func (sub Set) Keys() []string {
keys := make([]string, len(sub))

for key := range sub {
keys = append(keys, key)
}

return keys
}

func (sub Set) isIn(super Set) bool {
flag := true
Expand Down
2 changes: 1 addition & 1 deletion microceph/ceph/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func Start(s common.StateInterface) error {
continue
}

err = updateConfig(s)
err = UpdateConfig(s)
if err != nil {
time.Sleep(10 * time.Second)
continue
Expand Down
Loading

0 comments on commit b1a0401

Please sign in to comment.