Skip to content

Commit

Permalink
add uncloud machine init command to initialise a new cluster on loc…
Browse files Browse the repository at this point in the history
…al machine
  • Loading branch information
psviderski committed Sep 5, 2024
1 parent 58fb310 commit 3f71aec
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 51 deletions.
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
.PHONY: build
uncloudd-dev1:
GOOS=linux GOARCH=amd64 go build -o uncloudd-linux-amd64 cmd/uncloudd/main.go && \
GOOS=linux GOARCH=amd64 go build -o uncloudd-linux-amd64 ./cmd/uncloudd && \
scp uncloudd-linux-amd64 [email protected]:~/ && \
ssh [email protected] sudo install ./uncloudd-linux-amd64 /usr/local/bin/uncloudd
GOOS=linux GOARCH=amd64 go build -o uncloud-linux-amd64 ./cmd/uncloud && \
scp uncloud-linux-amd64 [email protected]:~/ && \
ssh [email protected] sudo install ./uncloud-linux-amd64 /usr/local/bin/uncloud

.PHONY: proto
proto:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/machine/cluster/pb/cluster.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/machine/api/pb/cluster.proto
64 changes: 64 additions & 0 deletions cmd/uncloud/machine/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package machine

import (
"fmt"
"github.com/spf13/cobra"
"net/netip"
"uncloud/internal/machine"
"uncloud/internal/machine/api/pb"
"uncloud/internal/machine/daemon"
"uncloud/internal/machine/network"
"uncloud/internal/secret"
)

type initOptions struct {
name string
network string
userPublicKey string
dataDir string
}

func NewInitCommand() *cobra.Command {
opts := initOptions{}
cmd := &cobra.Command{
Use: "init",
Short: "Initialise a new cluster that consists of the local or remote machine",
RunE: func(cmd *cobra.Command, args []string) error {
netPrefix, err := netip.ParsePrefix(opts.network)
if err != nil {
return fmt.Errorf("parse network CIDR: %w", err)
}

var users []*pb.User
if opts.userPublicKey != "" {
pubKey, uErr := secret.FromHexString(opts.userPublicKey)
if uErr != nil {
return fmt.Errorf("parse user's public key: %w", uErr)
}
user := &pb.User{
Network: &pb.NetworkConfig{
ManagementIp: pb.NewIP(network.ManagementIP(pubKey)),
PublicKey: pubKey,
},
}
users = append(users, user)
}

if err = daemon.InitCluster(opts.dataDir, opts.name, netPrefix, users); err != nil {
return fmt.Errorf("initialise cluster: %w", err)
}
return nil
},
}
cmd.Flags().StringVarP(&opts.name, "name", "n", "", "Assign a name to the machine")
cmd.Flags().StringVar(&opts.network, "network", network.DefaultNetwork.String(),
"IPv4 network CIDR to use for machines and services")
cmd.Flags().StringVarP(&opts.userPublicKey, "user-pubkey", "u", "",
"User's public key which will be able to access the cluster (hex-encoded)")

cmd.Flags().StringVarP(&opts.dataDir, "data-dir", "d", machine.DefaultDataDir,
"Directory for storing persistent machine state")
_ = cmd.MarkFlagDirname("data-dir")

return cmd
}
1 change: 1 addition & 0 deletions cmd/uncloud/machine/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ func NewRootCommand() *cobra.Command {
}
cmd.AddCommand(
NewAddCommand(),
NewInitCommand(),
)
return cmd
}
11 changes: 9 additions & 2 deletions cmd/uncloudd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@ func main() {
SilenceUsage: true,
SilenceErrors: true,
RunE: func(cmd *cobra.Command, args []string) error {
return daemon.Run(cmd.Context(), dataDir)
d, err := daemon.New(dataDir)
if err != nil {
return err
}
if err = d.Run(cmd.Context()); err == nil {
slog.Info("Daemon stopped.")
}
return err
},
}
cmd.PersistentFlags().StringVarP(&dataDir, "data-dir", "d", machine.DefaultDataDir,
Expand All @@ -41,5 +48,5 @@ func main() {
}()

cobra.CheckErr(cmd.ExecuteContext(ctx))
slog.Info("Daemon stopped.")

}
20 changes: 0 additions & 20 deletions internal/machine/api/server.go

This file was deleted.

153 changes: 129 additions & 24 deletions internal/machine/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,150 @@ package daemon

import (
"context"
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"log/slog"
"net"
"net/netip"
"os"
"strconv"
"uncloud/internal/machine"
"uncloud/internal/machine/api"
"uncloud/internal/machine/api/pb"
"uncloud/internal/machine/cluster"
"uncloud/internal/machine/network"
)

const (
MachineAPIPort = 51000
)
// InitCluster resets the local machine and initialises a new cluster with it.
// TODO: ideally, this should be an RPC call to the daemon API to correctly handle the leave request and reconfiguration.
func InitCluster(dataDir, machineName string, netPrefix netip.Prefix, users []*pb.User) error {
var err error
if machineName == "" {
machineName, err = machine.NewRandomName()
if err != nil {
return fmt.Errorf("generate machine name: %w", err)
}
}
privKey, pubKey, err := network.NewMachineKeys()
if err != nil {
return fmt.Errorf("generate machine keys: %w", err)
}

state := cluster.NewState(cluster.StatePath(dataDir))
c := cluster.NewCluster(state, "")
if err = c.SetNetwork(netPrefix); err != nil {
return fmt.Errorf("set cluster network: %w", err)
}

// Use all routable addresses as endpoints.
addrs, err := network.ListRoutableAddresses()
if err != nil {
return fmt.Errorf("list routable addresses: %w", err)
}
endpoints := make([]*pb.IPPort, len(addrs))
for i, addr := range addrs {
addrPort := netip.AddrPortFrom(addr, network.WireGuardPort)
endpoints[i] = pb.NewIPPort(addrPort)
}
// Register the new machine in the cluster to populate the state and get its ID and subnet.
req := &pb.AddMachineRequest{
Name: machineName,
Network: &pb.NetworkConfig{
Endpoints: endpoints,
PublicKey: pubKey,
},
}
resp, err := c.AddMachine(context.Background(), req)
if err != nil {
return fmt.Errorf("add machine to cluster: %w", err)
}

m := resp.Machine
subnet, err := m.Network.Subnet.ToPrefix()
if err != nil {
return err
}
manageIP, err := m.Network.ManagementIp.ToAddr()
if err != nil {
return err
}
mcfg := &machine.Config{
ID: m.Id,
Name: m.Name,
Network: &network.Config{
Subnet: subnet,
ManagementIP: manageIP,
PrivateKey: privKey,
PublicKey: pubKey,
},
}

// Add users to the cluster and build peers config from them.
peers := make([]network.PeerConfig, len(users))
for i, u := range users {
if err = c.AddUser(u); err != nil {
return fmt.Errorf("add user to cluster: %w", err)
}
userManageIP, uErr := u.Network.ManagementIp.ToAddr()
if uErr != nil {
return uErr
}
peers[i] = network.PeerConfig{
ManagementIP: userManageIP,
PublicKey: u.Network.PublicKey,
}
}
mcfg.Network.Peers = peers

mcfg.SetPath(machine.ConfigPath(dataDir))
if err = mcfg.Save(); err != nil {
return fmt.Errorf("save machine config: %w", err)
}

func Run(ctx context.Context, dataDir string) error {
fmt.Printf("Cluster initialised with machine %q\n", m.Name)
return nil
}

type Daemon struct {
config *machine.Config
cluster *cluster.Cluster
}

func New(dataDir string) (*Daemon, error) {
cfg, err := machine.ParseConfig(machine.ConfigPath(dataDir))
if err != nil {
return fmt.Errorf("load machine config: %w", err)
return nil, fmt.Errorf("load machine config: %w", err)
}

statePath := cluster.StatePath(dataDir)
state := cluster.NewState(statePath)
if err = state.Load(); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("load cluster state: %w", err)
}
slog.Info("No cluster state found, creating a new one.", "path", statePath)
if err = state.Save(); err != nil {
return nil, fmt.Errorf("save cluster state: %w", err)
}
}

apiAddr := net.JoinHostPort(cfg.Network.ManagementIP.String(), strconv.Itoa(machine.APIPort))
c := cluster.NewCluster(state, apiAddr)

return &Daemon{
config: cfg,
cluster: c,
}, nil
}

func (d *Daemon) Run(ctx context.Context) error {
wgnet, err := network.NewWireGuardNetwork()
if err != nil {
return fmt.Errorf("create WireGuard network: %w", err)
}
if err = wgnet.Configure(*cfg.Network); err != nil {
if err = wgnet.Configure(*d.config.Network); err != nil {
return fmt.Errorf("configure WireGuard network: %w", err)
}

//ctx, cancel := context.WithCancel(context.Background())
//go wgnet.WatchEndpoints(ctx, peerEndpointChangeNotifier)

Expand All @@ -41,20 +155,12 @@ func Run(ctx context.Context, dataDir string) error {
//}
//fmt.Println("Addresses:", addrs)

apiAddr := net.JoinHostPort(cfg.Network.ManagementIP.String(), strconv.Itoa(MachineAPIPort))
listener, err := net.Listen("tcp", apiAddr)
if err != nil {
return fmt.Errorf("listen API port: %w", err)
}
grpcServer := grpc.NewServer()
pb.RegisterClusterServer(grpcServer, api.NewServer())

// Use an errgroup to coordinate error handling and graceful shutdown of multiple daemon components.
errGroup, ctx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
slog.Info("Starting API server.", "addr", apiAddr)
if sErr := grpcServer.Serve(listener); sErr != nil {
return fmt.Errorf("API server failed: %w", sErr)
slog.Info("Starting cluster.")
if err = d.cluster.Run(); err != nil {
return fmt.Errorf("cluster failed: %w", err)
}
return nil
})
Expand All @@ -67,10 +173,9 @@ func Run(ctx context.Context, dataDir string) error {
// Shutdown goroutine.
errGroup.Go(func() error {
<-ctx.Done()
slog.Info("Stopping API server.")
// TODO: implement timeout for graceful shutdown.
grpcServer.GracefulStop()
slog.Info("API server stopped.")
slog.Info("Stopping cluster.")
d.cluster.Stop()
slog.Info("Cluster stopped.")
return nil
})

Expand Down
15 changes: 12 additions & 3 deletions internal/secret/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ import (

type Secret []byte

// FromHexString parses a hex-encoded string into a secret.
func FromHexString(s string) (Secret, error) {
decoded, err := hex.DecodeString(s)
if err != nil {
return nil, fmt.Errorf("invalid hex-encoded secret: %w", err)
}
return decoded, nil
}

// String returns the hex-encoded string representation of the secret.
//
//goland:noinspection GoMixedReceiverTypes
Expand All @@ -23,11 +32,11 @@ func (s Secret) MarshalText() ([]byte, error) {

//goland:noinspection GoMixedReceiverTypes
func (s *Secret) UnmarshalText(text []byte) error {
decoded, err := hex.DecodeString(string(text))
secret, err := FromHexString(string(text))
if err != nil {
return fmt.Errorf("invalid hex-encoded secret: %w", err)
return err
}
*s = decoded
*s = secret
return nil
}

Expand Down

0 comments on commit 3f71aec

Please sign in to comment.