Skip to content

Commit

Permalink
bugfix: flaky queue/Example_connectionPool
Browse files Browse the repository at this point in the history
We need to wait for an additional events to make the test stable:

1. A ready state for a queue.
2. A success queue configuration on all instances.
3. An available RW instance.
4. A success role switch.

Closes #278
  • Loading branch information
oleg-jukovec committed May 12, 2023
1 parent e3d2e03 commit bc2537e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
set (#272)
- Connect() panics on concurrent schema update (#278)
- Wrong Ttr setup by Queue.Cfg() (#278)
- Flaky queue/Example_connectionPool (#278)

## [1.10.0] - 2022-12-31

Expand Down
99 changes: 60 additions & 39 deletions queue/example_connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ type QueueConnectionHandler struct {
name string
cfg queue.Cfg

uuid uuid.UUID
registered bool
err error
mutex sync.Mutex
masterUpdated chan struct{}
masterCnt int32
uuid uuid.UUID
registered bool
err error
mutex sync.Mutex
updated chan struct{}
masterCnt int32
}

// QueueConnectionHandler implements the ConnectionHandler interface.
Expand All @@ -32,9 +32,9 @@ var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{}
// NewQueueConnectionHandler creates a QueueConnectionHandler object.
func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler {
return &QueueConnectionHandler{
name: name,
cfg: cfg,
masterUpdated: make(chan struct{}, 10),
name: name,
cfg: cfg,
updated: make(chan struct{}, 10),
}
}

Expand All @@ -53,15 +53,24 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
}

master := role == connection_pool.MasterRole
if master {
defer func() {
h.masterUpdated <- struct{}{}
}()
}

// Set up a queue module configuration for an instance.
q := queue.New(conn, h.name)

// Check is queue ready to work.
if state, err := q.State(); err != nil {
h.updated <- struct{}{}
h.err = err
return err
} else if master && state != queue.RunningState {
return fmt.Errorf("queue state is not RUNNING: %d", state)
} else if !master && state != queue.InitState && state != queue.WaitingState {
return fmt.Errorf("queue state is not INIT and not WAITING: %d", state)
}

defer func() {
h.updated <- struct{}{}
}()

// Set up a queue module configuration for an instance. Ideally, this
// should be done before box.cfg({}) or you need to wait some time
// before start a work.
Expand All @@ -79,10 +88,6 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
return nil
}

if h.err = q.Create(h.cfg); h.err != nil {
return h.err
}

if !h.registered {
// We register a shared session at the first time.
if h.uuid, h.err = q.Identify(nil); h.err != nil {
Expand All @@ -96,6 +101,10 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
}
}

if h.err = q.Create(h.cfg); h.err != nil {
return h.err
}

fmt.Printf("Master %s is ready to work!\n", conn.Addr())
atomic.AddInt32(&h.masterCnt, 1)

Expand All @@ -113,7 +122,7 @@ func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,

// Closes closes a QueueConnectionHandler object.
func (h *QueueConnectionHandler) Close() {
close(h.masterUpdated)
close(h.updated)
}

// Example demonstrates how to use the queue package with the connection_pool
Expand Down Expand Up @@ -162,8 +171,10 @@ func Example_connectionPool() {
}
defer connPool.Close()

// Wait for a master instance identification in the queue.
<-h.masterUpdated
// Wait for a queue initialization and master instance identification in
// the queue.
<-h.updated
<-h.updated
if h.err != nil {
fmt.Printf("Unable to identify in the pool: %s", h.err)
return
Expand All @@ -184,14 +195,17 @@ func Example_connectionPool() {

// Switch a master instance in the pool.
roles := []bool{true, false}
err = test_helpers.SetClusterRO(servers, connOpts, roles)
if err != nil {
fmt.Printf("Unable to set cluster roles: %s", err)
return
for {
err := test_helpers.SetClusterRO(servers, connOpts, roles)
if err == nil {
break
}
}

// Wait for a new master instance re-identification.
<-h.masterUpdated
// Wait for a replica instance connection and a new master instance
// re-identification.
<-h.updated
<-h.updated
h.mutex.Lock()
err = h.err
h.mutex.Unlock()
Expand All @@ -211,17 +225,24 @@ func Example_connectionPool() {
time.Sleep(poolOpts.CheckTimeout)
}

// Take a data from the new master instance.
task, err := q.Take()
if err != nil {
fmt.Println("Unable to got task:", err)
} else if task == nil {
fmt.Println("task == nil")
} else if task.Data() == nil {
fmt.Println("task.Data() == nil")
} else {
task.Ack()
fmt.Println("Got data:", task.Data())
for {
// Take a data from the new master instance.
task, err := q.Take()

if err == connection_pool.ErrNoRwInstance {
// It may be not registered yet by the pool.
continue
} else if err != nil {
fmt.Println("Unable to got task:", err)
} else if task == nil {
fmt.Println("task == nil")
} else if task.Data() == nil {
fmt.Println("task.Data() == nil")
} else {
task.Ack()
fmt.Println("Got data:", task.Data())
}
break
}

// Output:
Expand Down

0 comments on commit bc2537e

Please sign in to comment.