diff --git a/internal/cli/cli.go b/internal/cli/cli.go index d177e5e..a743708 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -72,6 +72,53 @@ func (cli *CLI) SetCurrentCluster(name string) error { return cli.config.Save() } +func (cli *CLI) ConnectCluster(ctx context.Context, clusterName string) (*client.Client, error) { + if len(cli.config.Clusters) == 0 { + return nil, errors.New( + "no clusters found in the Uncloud config. " + + "Please initialise a cluster with `uncloud machine init` first", + ) + } + if clusterName == "" { + // If the cluster is not specified, use the current cluster if set. + if cli.config.CurrentCluster == "" { + return nil, errors.New( + "the current cluster is not set in the Uncloud config. " + + "Please specify a cluster with the --cluster flag or set current_cluster in the config", + ) + } + if _, ok := cli.config.Clusters[cli.config.CurrentCluster]; !ok { + return nil, fmt.Errorf( + "current cluster %q not found in the config. "+ + "Please specify a cluster with the --cluster flag or update current_cluster in the config", + cli.config.CurrentCluster, + ) + } + clusterName = cli.config.CurrentCluster + } + + cfg, ok := cli.config.Clusters[clusterName] + if !ok { + return nil, fmt.Errorf("cluster %q not found in the config", clusterName) + } + if len(cfg.Connections) == 0 { + return nil, fmt.Errorf("no connection configurations found for cluster %q in the config", clusterName) + } + + // TODO: iterate over all connections and try to connect to the cluster using the first successful connection. + conn := cfg.Connections[0] + user, host, port, err := conn.SSH.Parse() + if err != nil { + return nil, fmt.Errorf("parse SSH connection %q: %w", conn.SSH, err) + } + sshConfig := &connector.SSHConnectorConfig{ + User: user, + Host: host, + Port: port, + } + return client.New(ctx, connector.NewSSHConnector(sshConfig)) +} + func (cli *CLI) InitCluster( ctx context.Context, remoteMachine *RemoteMachine, clusterName, machineName string, netPrefix netip.Prefix, ) error { @@ -150,53 +197,6 @@ func (cli *CLI) initRemoteMachine( return nil } -func (cli *CLI) ConnectCluster(ctx context.Context, clusterName string) (*client.Client, error) { - if len(cli.config.Clusters) == 0 { - return nil, errors.New( - "no clusters found in the Uncloud config. " + - "Please initialise a cluster with `uncloud machine init` first", - ) - } - if clusterName == "" { - // If the cluster is not specified, use the current cluster if set. - if cli.config.CurrentCluster == "" { - return nil, errors.New( - "the current cluster is not set in the Uncloud config. " + - "Please specify a cluster with the --cluster flag or set current_cluster in the config", - ) - } - if _, ok := cli.config.Clusters[cli.config.CurrentCluster]; !ok { - return nil, fmt.Errorf( - "current cluster %q not found in the config. "+ - "Please specify a cluster with the --cluster flag or update current_cluster in the config", - cli.config.CurrentCluster, - ) - } - clusterName = cli.config.CurrentCluster - } - - cfg, ok := cli.config.Clusters[clusterName] - if !ok { - return nil, fmt.Errorf("cluster %q not found in the config", clusterName) - } - if len(cfg.Connections) == 0 { - return nil, fmt.Errorf("no connection configurations found for cluster %q in the config", clusterName) - } - - // TODO: iterate over all connections and try to connect to the cluster using the first successful connection. - conn := cfg.Connections[0] - user, host, port, err := conn.SSH.Parse() - if err != nil { - return nil, fmt.Errorf("parse SSH connection %q: %w", conn.SSH, err) - } - sshConfig := &connector.SSHConnectorConfig{ - User: user, - Host: host, - Port: port, - } - return client.New(ctx, connector.NewSSHConnector(sshConfig)) -} - func (cli *CLI) AddMachine(ctx context.Context, remoteMachine RemoteMachine, clusterName, machineName string) error { c, err := cli.ConnectCluster(ctx, clusterName) if err != nil { @@ -238,6 +238,7 @@ func (cli *CLI) AddMachine(ctx context.Context, remoteMachine RemoteMachine, clu return fmt.Errorf("parse remote machine token: %w", err) } + // Register the machine in the cluster using its public key and endpoints from the token. endpoints := make([]*pb.IPPort, len(token.Endpoints)) for i, addrPort := range token.Endpoints { endpoints[i] = pb.NewIPPort(addrPort) @@ -253,31 +254,41 @@ func (cli *CLI) AddMachine(ctx context.Context, remoteMachine RemoteMachine, clu if err != nil { return fmt.Errorf("add machine to cluster: %w", err) } - fmt.Println("Machine added to cluster", addResp.Machine) - - //joinReq := &pb.JoinClusterRequest{ - // Machine: addResp.Machine, - //} - - // TODO: - // --1. Establish a client connection to the remote machine. - // --2. Check if the machine is already provisioned and ask the user to reset it first. - // --3. Download and install the latest uncloudd binary by running the install shell script from GitHub. - // --4. Request token from the remote machine. - // --5. Add the machine to the cluster using its token and receive the added machine info. - // 6. Request the machine to join the cluster using the configuration token. - // 7. Save the machine's SSH connection details in the cluster config. - - //name, connCfg, err := c.AddMachine(ctx, machineName, user, host, port, sshKeyPath) - //if err != nil { - // return fmt.Errorf("add machine to cluster %q: %w", cluster.Name(), err) - //} - //fmt.Printf("Machine %q added to cluster %q\n", name, cluster.Name()) - // - //cli.config.Clusters[cluster.Name()].Connections = append(cli.config.Clusters[cluster.Name()].Connections, connCfg) - //if err = cli.config.Save(); err != nil { - // return fmt.Errorf("save config: %w", err) - //} + + // List other machines in the cluster to include them in the join request. + listResp, err := c.ListMachines(ctx, &emptypb.Empty{}) + if err != nil { + return fmt.Errorf("list cluster machines: %w", err) + } + otherMachines := make([]*pb.MachineInfo, 0, len(listResp.Machines)-1) + for _, m := range listResp.Machines { + if m.Id != addResp.Machine.Id { + otherMachines = append(otherMachines, m) + } + } + + // Configure the remote machine to join the cluster. + joinReq := &pb.JoinClusterRequest{ + Machine: addResp.Machine, + OtherMachines: otherMachines, + } + if _, err = machineClient.JoinCluster(ctx, joinReq); err != nil { + return fmt.Errorf("join cluster: %w", err) + } + + fmt.Printf("Machine %q added to cluster\n", addResp.Machine.Name) + + // Save the machine's SSH connection details in the cluster config. + connCfg := config.MachineConnection{ + SSH: config.NewSSHDestination(remoteMachine.User, remoteMachine.Host, remoteMachine.Port), + } + if clusterName == "" { + clusterName = cli.config.CurrentCluster + } + cli.config.Clusters[clusterName].Connections = append(cli.config.Clusters[clusterName].Connections, connCfg) + if err = cli.config.Save(); err != nil { + return fmt.Errorf("save config: %w", err) + } return nil } @@ -303,7 +314,6 @@ func (cli *CLI) ListMachines(ctx context.Context, clusterName string) error { return fmt.Errorf("write header: %w", err) } // Print rows. - fmt.Println("listResp", len(listResp.Machines)) for _, m := range listResp.Machines { subnet, _ := m.Network.Subnet.ToPrefix() endpoints := make([]string, len(m.Network.Endpoints)) diff --git a/internal/machine/machine.go b/internal/machine/machine.go index 5a72015..f4bc9f2 100644 --- a/internal/machine/machine.go +++ b/internal/machine/machine.go @@ -411,6 +411,78 @@ func (m *Machine) InitCluster(ctx context.Context, req *pb.InitClusterRequest) ( return resp, nil } +// JoinCluster resets the local machine and configures it to join an existing cluster. +func (m *Machine) JoinCluster(ctx context.Context, req *pb.JoinClusterRequest) (*emptypb.Empty, error) { + // TODO: a proper cluster leave mechanism and machine reset should be implemented later. + // For now assume the machine wasn't part of a cluster. + + if req.Machine.Id == "" { + return nil, status.Error(codes.InvalidArgument, "machine ID not set") + } + if req.Machine.Name == "" { + return nil, status.Error(codes.InvalidArgument, "machine name not set") + } + if req.Machine.Network == nil { + return nil, status.Error(codes.InvalidArgument, "network not set") + } + if err := req.Machine.Network.Validate(); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid network config: %v", err) + } + if !m.state.Network.PublicKey.Equal(req.Machine.Network.PublicKey) { + return nil, status.Error( + codes.InvalidArgument, "public key in the request does not match the public key on the machine", + ) + } + + // Update the machine state with the provided cluster configuration. + subnet, _ := req.Machine.Network.Subnet.ToPrefix() + manageIP, _ := req.Machine.Network.ManagementIp.ToAddr() + m.state.ID = req.Machine.Id + m.state.Name = req.Machine.Name + m.state.Network = &network.Config{ + Subnet: subnet, + ManagementIP: manageIP, + PrivateKey: m.state.Network.PrivateKey, + PublicKey: m.state.Network.PublicKey, + } + + // Build a peers config from other cluster machines. + m.state.Network.Peers = make([]network.PeerConfig, 0, len(req.OtherMachines)) + for _, om := range req.OtherMachines { + if err := om.Network.Validate(); err != nil { + continue + } + omSubnet, _ := om.Network.Subnet.ToPrefix() + omManageIP, _ := om.Network.ManagementIp.ToAddr() + omEndpoints := make([]netip.AddrPort, len(om.Network.Endpoints)) + for i, ep := range om.Network.Endpoints { + addrPort, _ := ep.ToAddrPort() + omEndpoints[i] = addrPort + } + peer := network.PeerConfig{ + Subnet: &omSubnet, + ManagementIP: omManageIP, + AllEndpoints: omEndpoints, + PublicKey: om.Network.PublicKey, + } + if len(omEndpoints) > 0 { + peer.Endpoint = &omEndpoints[0] + } + m.state.Network.Peers = append(m.state.Network.Peers, peer) + } + + if err := m.state.Save(); err != nil { + return nil, status.Errorf(codes.Internal, "save machine state: %v", err) + } + slog.Info("Machine configured to join the cluster.", "id", m.state.ID, "name", m.state.Name) + // Signal that the machine is initialised as a member of a cluster. + m.initialised <- struct{}{} + // TODO: consider calling a synchronous method to reconfigure the network to return error if it fails. + // Alternatively a client can call another method to check the network status. + + return &emptypb.Empty{}, nil +} + // Token returns the local machine's token that can be used for adding the machine to a cluster. func (m *Machine) Token(_ context.Context, _ *emptypb.Empty) (*pb.TokenResponse, error) { if len(m.state.Network.PublicKey) == 0 {