Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race between shutdownCh and consumerCh in inmemPipeline.AppendEntries #275

Open
freeekanayaka opened this issue Feb 12, 2018 · 1 comment
Assignees

Comments

@freeekanayaka
Copy link
Contributor

freeekanayaka commented Feb 12, 2018

This select statement in inmemPipeline.AppendEntries:

	select {
	case i.peer.consumerCh <- rpc:
	case <-timeout:
		return nil, fmt.Errorf("command enqueue timeout")
	case <-i.shutdownCh:
		return nil, ErrPipelineShutdown
	}

can occasionally choose to the i.peer.consumerCh even if shutdownCh has been closed, resulting in the RPC succeeding even if user code had previously called InmemTransport.Close(), which is the one closing shutdownCh.

This behavior is both unexpected and inconsistent with a real-world NetworkTransport, where Close() would close the network listener and cause the pipeline to not deliver any RPC from that point on.

I spotted this in unit tests that were using InmemTransport. I'm attaching a sample program below which reproduces the problem, just run it in a loop and it will eventually fail.

Note however that you'll need to run the sample program against the branch linked to #274, not master, otherwise you'll randomly hit either the race described in this issue XOR the other one described in #273 (which is what #274 fixes).

@freeekanayaka
Copy link
Contributor Author

Here's the sample program that eventually reproduces the issue, if run in a loop:

package main

import (
	"io"
	"log"
	"strconv"
	"time"

	"github.com/hashicorp/raft"
)

func main() {
	n := 3 // Number of nodes
	for i := 0; i < n; i++ {
	}

	// Create raft dependencies
	configs := newConfigs(n)
	stores := newStores(n)
	snapshots := newSnapshotStores(n)
	transports := newTranports(n)

	// Bootstrap initial configuration
	configuration := newConfiguration(n)
	for i := 0; i < n; i++ {
		err := raft.BootstrapCluster(
			configs[i],
			stores[i],
			stores[i],
			snapshots[i],
			transports[i],
			configuration,
		)
		if err != nil {
			log.Fatalf("bootstrap error: %v", err)
		}
	}

	// Setup notification channels
	notifyChs := make([]chan bool, n)
	for i := 0; i < n; i++ {
		notifyChs[i] = make(chan bool, 0)
		configs[i].NotifyCh = notifyChs[i]
	}

	// Start raft instances.
	fsms := newFSMs(n)
	rafts := make([]*raft.Raft, n)
	for i := 0; i < n; i++ {
		raft, err := raft.NewRaft(
			configs[i],
			fsms[i],
			stores[i],
			stores[i],
			snapshots[i],
			transports[i],
		)
		if err != nil {
			log.Fatalf("start error: %v", err)
		}
		rafts[i] = raft
	}

	// Wait for a leader to be elected.
	var i int
	select {
	case <-notifyChs[0]:
		i = 0
	case <-notifyChs[1]:
		i = 1
	case <-notifyChs[2]:
		i = 2
	}
	if rafts[i].State() != raft.Leader {
		log.Fatal("notified channel triggered even if not is not leader")
	}

	// Disconnect the leader transport from the others.
	transports[i].DisconnectAll()
	for j := 0; j < n; j++ {
		if i == j {
			continue
		}
		transports[j].Disconnect(transports[i].LocalAddr())
	}

	// Apply a new log entry. Since the leader is the apply future should
	// error.
	err := rafts[i].Apply([]byte{}, time.Second).Error()
	if err == nil {
		log.Fatal("apply future did not error despite leader was disconnected")
	}
	if err != raft.ErrLeadershipLost {
		log.Fatalf("apply future errored with %v instead of %v", err, raft.ErrLeadershipLost)
	}
}

// Create a new set of in-memory configs.
func newConfigs(n int) []*raft.Config {
	configs := make([]*raft.Config, n)
	for i := 0; i < n; i++ {
		config := raft.DefaultConfig()

		// Set low timeouts
		config.LocalID = raft.ServerID(strconv.Itoa(i))
		config.HeartbeatTimeout = 20 * time.Millisecond
		config.ElectionTimeout = 20 * time.Millisecond
		config.CommitTimeout = 1 * time.Millisecond
		config.LeaderLeaseTimeout = 10 * time.Millisecond

		configs[i] = config
	}
	return configs
}

// Create a new set of dummy FSMs.
func newFSMs(n int) []raft.FSM {
	fsms := make([]raft.FSM, n)
	for i := 0; i < n; i++ {
		fsms[i] = &FSM{}
	}
	return fsms
}

// Create a new set of in-memory log/stable stores.
func newStores(n int) []*raft.InmemStore {
	stores := make([]*raft.InmemStore, n)
	for i := 0; i < n; i++ {
		stores[i] = raft.NewInmemStore()
	}
	return stores
}

// Create a new set of in-memory snapshot stores.
func newSnapshotStores(n int) []raft.SnapshotStore {
	stores := make([]raft.SnapshotStore, n)
	for i := 0; i < n; i++ {
		stores[i] = raft.NewInmemSnapshotStore()
	}
	return stores
}

// Create a new set of in-memory transports, all connected to each other.
func newTranports(n int) []raft.LoopbackTransport {
	transports := make([]raft.LoopbackTransport, n)
	for i := 0; i < n; i++ {
		addr := raft.ServerAddress(strconv.Itoa(i))
		_, transports[i] = raft.NewInmemTransport(addr)
	}

	for _, t1 := range transports {
		for _, t2 := range transports {
			if t2 == t1 {
				continue
			}
			t1.Connect(t2.LocalAddr(), t2)
		}
	}
	return transports
}

// Create a new raft bootstrap configuration containing all nodes.
func newConfiguration(n int) raft.Configuration {
	servers := make([]raft.Server, n)
	for i := 0; i < n; i++ {
		addr := strconv.Itoa(i)
		servers[i] = raft.Server{
			ID:      raft.ServerID(addr),
			Address: raft.ServerAddress(addr),
		}
	}

	configuration := raft.Configuration{}
	configuration.Servers = servers

	return configuration

}

// Dummy FSM
type FSM struct{}

func (f *FSM) Apply(*raft.Log) interface{} { return nil }

func (f *FSM) Snapshot() (raft.FSMSnapshot, error) { return &FSMSnapshot{}, nil }

func (f *FSM) Restore(io.ReadCloser) error { return nil }

type FSMSnapshot struct{}

func (s *FSMSnapshot) Persist(sink raft.SnapshotSink) error { return nil }

func (s *FSMSnapshot) Release() {}

freeekanayaka added a commit to freeekanayaka/go-raft that referenced this issue Feb 12, 2018
This change fixes a race inside between the shutdownCh and the
consumerCh in the select statement of inmempipeLine.AppendEntries,
which could otherwise forward an RPC request to the consumer even if
the transport was closed with InmemTransport.Close().

See also hashicorp#275 for a sample program that reproduces the race if ran
long enough. The same program doesn't fail anymore with this change
applied.
freeekanayaka added a commit to freeekanayaka/go-raft that referenced this issue Feb 12, 2018
This change fixes a race inside between the shutdownCh and the
consumerCh in the select statement of inmempipeLine.AppendEntries,
which could otherwise forward an RPC request to the consumer even if
the transport was closed with InmemTransport.Close().

See also hashicorp#275 for a sample program that reproduces the race if ran
long enough. The same program doesn't fail anymore with this change
applied.
@schristoff schristoff added the bug label Apr 25, 2019
@schristoff schristoff removed their assignment Jan 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants