Skip to content

Commit

Permalink
machine add: configure remote machine to join the cluster network
Browse files Browse the repository at this point in the history
  • Loading branch information
psviderski committed Sep 12, 2024
1 parent 4e042be commit bd53e3f
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 73 deletions.
156 changes: 83 additions & 73 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,53 @@ func (cli *CLI) SetCurrentCluster(name string) error {
return cli.config.Save()
}

func (cli *CLI) ConnectCluster(ctx context.Context, clusterName string) (*client.Client, error) {
if len(cli.config.Clusters) == 0 {
return nil, errors.New(
"no clusters found in the Uncloud config. " +
"Please initialise a cluster with `uncloud machine init` first",
)
}
if clusterName == "" {
// If the cluster is not specified, use the current cluster if set.
if cli.config.CurrentCluster == "" {
return nil, errors.New(
"the current cluster is not set in the Uncloud config. " +
"Please specify a cluster with the --cluster flag or set current_cluster in the config",
)
}
if _, ok := cli.config.Clusters[cli.config.CurrentCluster]; !ok {
return nil, fmt.Errorf(
"current cluster %q not found in the config. "+
"Please specify a cluster with the --cluster flag or update current_cluster in the config",
cli.config.CurrentCluster,
)
}
clusterName = cli.config.CurrentCluster
}

cfg, ok := cli.config.Clusters[clusterName]
if !ok {
return nil, fmt.Errorf("cluster %q not found in the config", clusterName)
}
if len(cfg.Connections) == 0 {
return nil, fmt.Errorf("no connection configurations found for cluster %q in the config", clusterName)
}

// TODO: iterate over all connections and try to connect to the cluster using the first successful connection.
conn := cfg.Connections[0]
user, host, port, err := conn.SSH.Parse()
if err != nil {
return nil, fmt.Errorf("parse SSH connection %q: %w", conn.SSH, err)
}
sshConfig := &connector.SSHConnectorConfig{
User: user,
Host: host,
Port: port,
}
return client.New(ctx, connector.NewSSHConnector(sshConfig))
}

func (cli *CLI) InitCluster(
ctx context.Context, remoteMachine *RemoteMachine, clusterName, machineName string, netPrefix netip.Prefix,
) error {
Expand Down Expand Up @@ -150,53 +197,6 @@ func (cli *CLI) initRemoteMachine(
return nil
}

func (cli *CLI) ConnectCluster(ctx context.Context, clusterName string) (*client.Client, error) {
if len(cli.config.Clusters) == 0 {
return nil, errors.New(
"no clusters found in the Uncloud config. " +
"Please initialise a cluster with `uncloud machine init` first",
)
}
if clusterName == "" {
// If the cluster is not specified, use the current cluster if set.
if cli.config.CurrentCluster == "" {
return nil, errors.New(
"the current cluster is not set in the Uncloud config. " +
"Please specify a cluster with the --cluster flag or set current_cluster in the config",
)
}
if _, ok := cli.config.Clusters[cli.config.CurrentCluster]; !ok {
return nil, fmt.Errorf(
"current cluster %q not found in the config. "+
"Please specify a cluster with the --cluster flag or update current_cluster in the config",
cli.config.CurrentCluster,
)
}
clusterName = cli.config.CurrentCluster
}

cfg, ok := cli.config.Clusters[clusterName]
if !ok {
return nil, fmt.Errorf("cluster %q not found in the config", clusterName)
}
if len(cfg.Connections) == 0 {
return nil, fmt.Errorf("no connection configurations found for cluster %q in the config", clusterName)
}

// TODO: iterate over all connections and try to connect to the cluster using the first successful connection.
conn := cfg.Connections[0]
user, host, port, err := conn.SSH.Parse()
if err != nil {
return nil, fmt.Errorf("parse SSH connection %q: %w", conn.SSH, err)
}
sshConfig := &connector.SSHConnectorConfig{
User: user,
Host: host,
Port: port,
}
return client.New(ctx, connector.NewSSHConnector(sshConfig))
}

func (cli *CLI) AddMachine(ctx context.Context, remoteMachine RemoteMachine, clusterName, machineName string) error {
c, err := cli.ConnectCluster(ctx, clusterName)
if err != nil {
Expand Down Expand Up @@ -238,6 +238,7 @@ func (cli *CLI) AddMachine(ctx context.Context, remoteMachine RemoteMachine, clu
return fmt.Errorf("parse remote machine token: %w", err)
}

// Register the machine in the cluster using its public key and endpoints from the token.
endpoints := make([]*pb.IPPort, len(token.Endpoints))
for i, addrPort := range token.Endpoints {
endpoints[i] = pb.NewIPPort(addrPort)
Expand All @@ -253,31 +254,41 @@ func (cli *CLI) AddMachine(ctx context.Context, remoteMachine RemoteMachine, clu
if err != nil {
return fmt.Errorf("add machine to cluster: %w", err)
}
fmt.Println("Machine added to cluster", addResp.Machine)

//joinReq := &pb.JoinClusterRequest{
// Machine: addResp.Machine,
//}

// TODO:
// --1. Establish a client connection to the remote machine.
// --2. Check if the machine is already provisioned and ask the user to reset it first.
// --3. Download and install the latest uncloudd binary by running the install shell script from GitHub.
// --4. Request token from the remote machine.
// --5. Add the machine to the cluster using its token and receive the added machine info.
// 6. Request the machine to join the cluster using the configuration token.
// 7. Save the machine's SSH connection details in the cluster config.

//name, connCfg, err := c.AddMachine(ctx, machineName, user, host, port, sshKeyPath)
//if err != nil {
// return fmt.Errorf("add machine to cluster %q: %w", cluster.Name(), err)
//}
//fmt.Printf("Machine %q added to cluster %q\n", name, cluster.Name())
//
//cli.config.Clusters[cluster.Name()].Connections = append(cli.config.Clusters[cluster.Name()].Connections, connCfg)
//if err = cli.config.Save(); err != nil {
// return fmt.Errorf("save config: %w", err)
//}

// List other machines in the cluster to include them in the join request.
listResp, err := c.ListMachines(ctx, &emptypb.Empty{})
if err != nil {
return fmt.Errorf("list cluster machines: %w", err)
}
otherMachines := make([]*pb.MachineInfo, 0, len(listResp.Machines)-1)
for _, m := range listResp.Machines {
if m.Id != addResp.Machine.Id {
otherMachines = append(otherMachines, m)
}
}

// Configure the remote machine to join the cluster.
joinReq := &pb.JoinClusterRequest{
Machine: addResp.Machine,
OtherMachines: otherMachines,
}
if _, err = machineClient.JoinCluster(ctx, joinReq); err != nil {
return fmt.Errorf("join cluster: %w", err)
}

fmt.Printf("Machine %q added to cluster\n", addResp.Machine.Name)

// Save the machine's SSH connection details in the cluster config.
connCfg := config.MachineConnection{
SSH: config.NewSSHDestination(remoteMachine.User, remoteMachine.Host, remoteMachine.Port),
}
if clusterName == "" {
clusterName = cli.config.CurrentCluster
}
cli.config.Clusters[clusterName].Connections = append(cli.config.Clusters[clusterName].Connections, connCfg)
if err = cli.config.Save(); err != nil {
return fmt.Errorf("save config: %w", err)
}

return nil
}
Expand All @@ -303,7 +314,6 @@ func (cli *CLI) ListMachines(ctx context.Context, clusterName string) error {
return fmt.Errorf("write header: %w", err)
}
// Print rows.
fmt.Println("listResp", len(listResp.Machines))
for _, m := range listResp.Machines {
subnet, _ := m.Network.Subnet.ToPrefix()
endpoints := make([]string, len(m.Network.Endpoints))
Expand Down
72 changes: 72 additions & 0 deletions internal/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,78 @@ func (m *Machine) InitCluster(ctx context.Context, req *pb.InitClusterRequest) (
return resp, nil
}

// JoinCluster resets the local machine and configures it to join an existing cluster.
func (m *Machine) JoinCluster(ctx context.Context, req *pb.JoinClusterRequest) (*emptypb.Empty, error) {
// TODO: a proper cluster leave mechanism and machine reset should be implemented later.
// For now assume the machine wasn't part of a cluster.

if req.Machine.Id == "" {
return nil, status.Error(codes.InvalidArgument, "machine ID not set")
}
if req.Machine.Name == "" {
return nil, status.Error(codes.InvalidArgument, "machine name not set")
}
if req.Machine.Network == nil {
return nil, status.Error(codes.InvalidArgument, "network not set")
}
if err := req.Machine.Network.Validate(); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid network config: %v", err)
}
if !m.state.Network.PublicKey.Equal(req.Machine.Network.PublicKey) {
return nil, status.Error(
codes.InvalidArgument, "public key in the request does not match the public key on the machine",
)
}

// Update the machine state with the provided cluster configuration.
subnet, _ := req.Machine.Network.Subnet.ToPrefix()
manageIP, _ := req.Machine.Network.ManagementIp.ToAddr()
m.state.ID = req.Machine.Id
m.state.Name = req.Machine.Name
m.state.Network = &network.Config{
Subnet: subnet,
ManagementIP: manageIP,
PrivateKey: m.state.Network.PrivateKey,
PublicKey: m.state.Network.PublicKey,
}

// Build a peers config from other cluster machines.
m.state.Network.Peers = make([]network.PeerConfig, 0, len(req.OtherMachines))
for _, om := range req.OtherMachines {
if err := om.Network.Validate(); err != nil {
continue
}
omSubnet, _ := om.Network.Subnet.ToPrefix()
omManageIP, _ := om.Network.ManagementIp.ToAddr()
omEndpoints := make([]netip.AddrPort, len(om.Network.Endpoints))
for i, ep := range om.Network.Endpoints {
addrPort, _ := ep.ToAddrPort()
omEndpoints[i] = addrPort
}
peer := network.PeerConfig{
Subnet: &omSubnet,
ManagementIP: omManageIP,
AllEndpoints: omEndpoints,
PublicKey: om.Network.PublicKey,
}
if len(omEndpoints) > 0 {
peer.Endpoint = &omEndpoints[0]
}
m.state.Network.Peers = append(m.state.Network.Peers, peer)
}

if err := m.state.Save(); err != nil {
return nil, status.Errorf(codes.Internal, "save machine state: %v", err)
}
slog.Info("Machine configured to join the cluster.", "id", m.state.ID, "name", m.state.Name)
// Signal that the machine is initialised as a member of a cluster.
m.initialised <- struct{}{}
// TODO: consider calling a synchronous method to reconfigure the network to return error if it fails.
// Alternatively a client can call another method to check the network status.

return &emptypb.Empty{}, nil
}

// Token returns the local machine's token that can be used for adding the machine to a cluster.
func (m *Machine) Token(_ context.Context, _ *emptypb.Empty) (*pb.TokenResponse, error) {
if len(m.state.Network.PublicKey) == 0 {
Expand Down

0 comments on commit bd53e3f

Please sign in to comment.