Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raft.Shutdown() with InmemTransport hangs after leader changed #312

Closed
babbageclunk opened this issue Mar 19, 2019 · 1 comment
Closed

Comments

@babbageclunk
Copy link
Contributor

We sometimes see the Juju raft tests timeout because Raft.Shutdown has hung.

I've got a standalone reproduction (not that small though) that hangs reliably. Digging into it the InmemTransport only applies its timeout on receiving responses back from the peer, but not sending them.

package main

import (
	"fmt"
	"io"
	"log"
	"os"
	"sync"
	"time"

	"net/http"
	_ "net/http/pprof"

	"github.com/hashicorp/raft"
)

func main() {
	go func() {
		log.Println(http.ListenAndServe("localhost:6060", nil))
	}()

	syncStderr := &syncWriter{target: os.Stderr}
	r0, t0, err := newRaft("node-0", syncStderr)
	if err != nil {
		panic(err)
	}

	f := r0.BootstrapCluster(raft.Configuration{
		Servers: []raft.Server{{
			ID:      "node-0",
			Address: t0.LocalAddr(),
		}},
	})
	if err := f.Error(); err != nil {
		panic(err)
	}

	// Wait for r0 to be leader.
	select {
	case isLeader := <-r0.LeaderCh():
		if !isLeader {
			panic("r0 wasn't leader")
		}
	case <-time.After(time.Second):
		panic("timed out waiting for r0 to be leader")
	}

	// Make two more and add them to the cluster.
	r1, t1, err := newRaft("node-1", syncStderr)
	if err != nil {
		panic(err)
	}
	r2, t2, err := newRaft("node-2", syncStderr)
	if err != nil {
		panic(err)
	}
	connectTransports(t0, t1, t2)
	f1 := r0.AddVoter("node-1", t1.LocalAddr(), 0, 0)
	f2 := r0.AddVoter("node-2", t2.LocalAddr(), 0, 0)
	if err := f1.Error(); err != nil {
		panic(err)
	}
	if err := f2.Error(); err != nil {
		panic(err)
	}

	// Shut the leader down.
	if err := r0.Shutdown().Error(); err != nil {
		panic(err)
	}
	if err := t0.Close(); err != nil {
		panic(err)
	}

	// Wait until one of the other nodes is leader.
	var newLeader *raft.Raft
	var name string
loop:
	for {
		select {
		case value := <-r1.LeaderCh():
			if value {
				newLeader = r1
				name = "node-1"
				break loop
			}
		case value := <-r2.LeaderCh():
			if value {
				newLeader = r2
				name = "node-2"
				break loop
			}
		case <-time.After(2 * time.Second):
			panic("timed out waiting for new leader")
		}
	}
	// If this sleep is omitted the hang doesn't happen.
	time.Sleep(2 * time.Second)

	// Try to shut the new leader down.
	fmt.Printf("**** shutting down new leader %s\n", name)
	disconnectTransports(t0, t1, t2)
	if err := newLeader.Shutdown().Error(); err != nil {
		panic(err)
	}
	fmt.Printf("**** new leader %s shut down successfully\n", name)
}

func newRaft(id raft.ServerID, output io.Writer) (*raft.Raft, *raft.InmemTransport, error) {
	_, transport := raft.NewInmemTransport("")

	store := raft.NewInmemStore()
	snapshotStore := raft.NewInmemSnapshotStore()
	config := raft.DefaultConfig()
	config.ShutdownOnRemove = false
	config.LocalID = id
	config.HeartbeatTimeout = 100 * time.Millisecond
	config.ElectionTimeout = config.HeartbeatTimeout
	config.LeaderLeaseTimeout = config.HeartbeatTimeout
	config.Logger = log.New(output, fmt.Sprintf("%s ", id), log.LstdFlags)

	if err := raft.ValidateConfig(config); err != nil {
		return nil, nil, err
	}

	r, err := raft.NewRaft(config, &nullFSM{}, store, store, snapshotStore, transport)
	if err != nil {
		transport.Close()
		return nil, nil, err
	}
	return r, transport, nil

}

// Connect the provided transport bidirectionally.
func connectTransports(transports ...raft.LoopbackTransport) {
	for _, t1 := range transports {
		for _, t2 := range transports {
			if t1 == t2 {
				continue
			}
			t1.Connect(t2.LocalAddr(), t2)
		}
	}
}

func disconnectTransports(transports ...raft.LoopbackTransport) {
	for _, t1 := range transports {
		for _, t2 := range transports {
			if t1 == t2 {
				continue
			}
			t1.Disconnect(t2.LocalAddr())
		}
	}
}

type nullFSM struct{}

func (m *nullFSM) Apply(log *raft.Log) interface{} {
	return nil
}

func (m *nullFSM) Snapshot() (raft.FSMSnapshot, error) {
	return &nullSnapshot{}, nil
}

func (m *nullFSM) Restore(rc io.ReadCloser) error {
	rc.Close()
	return nil
}

type nullSnapshot struct{}

func (s *nullSnapshot) Persist(sink raft.SnapshotSink) error {
	sink.Close()
	return nil
}

func (s *nullSnapshot) Release() {}

type syncWriter struct {
	mu     sync.Mutex
	target io.Writer
}

func (w *syncWriter) Write(data []byte) (int, error) {
	w.mu.Lock()
	defer w.mu.Unlock()
	return w.target.Write(data)
}
@schristoff
Copy link
Contributor

Thank you so much for bringing this to our attention. I'm going to close this issue since the changes have been merged! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants