Skip to content

Commit

Permalink
client: don't lock in PromptShutdown, do it on Core shutdown
Browse files Browse the repository at this point in the history
Lock and stop wallets after other Core goroutines.

Previously we gave `(*Core).PromptShutdown` the expanded responsibility
of not just checking and prompting if a shutdown was OK but also locking
the wallets.  This was not the right place to do that for several reasons:

1. `PromptShutdown` is just a check and prompt function, not a "partial
  shutdown" function.  Shutdown via context cancellation is the correct approach.
2. `Core` consumers could and were not using `PromptShutdown`, and
  instead just cancelling `Core`'s context thus leaving the wallets unlocked
  after shutdown, thus leading to the now-fixed "wallet not unlocked" bug
  (8ff4d94) that was requiring such
  consumers to also restart their wallets when restarting.
3. Locking the wallets and the dex accounts before shutting down
  `(*Core).Run` goroutine was incorrect. After the DB and `dexConnection`s
  stop, the wallets may be locked and dex account private keys cleared.

* client/core: (*xcWallet).Connect must Disconnect on error

If (*xcWallet).Connect passed the call to connector.Connect, but any of
the remaining calls such as SyncStatus and OwnsAddress failed, Connect
would return a non-nil error and leave hookedUp=false.  However the
dex.ConnectionMaster (connector) was still connected.
This modifies (*xcWallet).Connect to Disconnect on any such error.

Also, with the newly-safe dex.(*ConnectionMaster).Disconnect method, we
now just call (*xcWallet).Disconnected without having to infer if it is
safe via (*xcWallet).connected.

* lock wallet before disconnect in CreateWallet and setWalletPassword
  • Loading branch information
chappjc authored Feb 11, 2021
1 parent cefe6a5 commit 8976d8d
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 78 deletions.
1 change: 1 addition & 0 deletions client/cmd/dexc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func main() {
go func() {
for range killChan {
if clientCore.PromptShutdown() {
log.Infof("Shutting down...")
cancel()
return
}
Expand Down
125 changes: 64 additions & 61 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,11 +891,11 @@ func New(cfg *Config) (*Core, error) {

// Run runs the core. Satisfies the runner.Runner interface.
func (c *Core) Run(ctx context.Context) {
c.log.Infof("started DEX client core")
c.log.Infof("Starting DEX client core")
// Store the context as a field, since we will need to spawn new DEX threads
// when new accounts are registered.
c.ctx = ctx
c.initialize()
c.initialize() // connectDEX gets ctx for the wsConn
close(c.ready)

// The DB starts first and stops last.
Expand All @@ -913,12 +913,39 @@ func (c *Core) Run(ctx context.Context) {
c.latencyQ.Run(ctx)
}()

c.wg.Wait()
c.wg.Wait() // block here until all goroutines except DB complete

// Stop the DB after dexConnections and other goroutines are done.
stopDB()
dbWG.Wait()

// Lock and disconnect the wallets.
c.walletMtx.Lock()
for assetID, wallet := range c.wallets {
delete(c.wallets, assetID)
if !wallet.connected() {
continue
}
symb := strings.ToUpper(unbip(assetID))
c.log.Infof("Locking %s wallet", symb)
if err := wallet.Lock(); err != nil {
c.log.Errorf("Failed to lock %v wallet: %v", symb, err)
}
wallet.Disconnect()
}
c.walletMtx.Unlock()

// Clear account private keys and wait for the DEX ws connections that began
// shutting down on context cancellation.
c.connMtx.Lock()
defer c.connMtx.Unlock()
for _, dc := range c.conns {
// context should be canceled allowing just a Wait(), but just in case
// use Disconnect otherwise this could hang forever.
dc.connMaster.Disconnect()
dc.acct.lock()
}

c.log.Infof("DEX client core off")
}

Expand Down Expand Up @@ -1110,7 +1137,7 @@ func (c *Core) connectedWallet(assetID uint32) (*xcWallet, error) {
// synching, this also starts a goroutine to monitor sync status, emitting
// WalletStateNotes on each progress update.
func (c *Core) connectWallet(w *xcWallet) (depositAddr string, err error) {
err = w.Connect(c.ctx) // ensures valid deposit address
err = w.Connect() // ensures valid deposit address
if err != nil {
return "", codedError(connectWalletErr, err)
}
Expand Down Expand Up @@ -1392,13 +1419,15 @@ func (c *Core) CreateWallet(appPW, walletPW []byte, form *WalletForm) error {
}

initErr := func(s string, a ...interface{}) error {
_ = wallet.Lock() // just try, but don't confuse the user with an error
wallet.Disconnect()
return fmt.Errorf(s, a...)
}

err = wallet.Unlock(crypter)
if err != nil {
return initErr("%s wallet authentication error: %w", symbol, err)
wallet.Disconnect()
return fmt.Errorf("%s wallet authentication error: %w", symbol, err)
}

balances, err := c.walletBalance(wallet)
Expand Down Expand Up @@ -1647,6 +1676,8 @@ func (c *Core) ReconfigureWallet(appPW, newWalletPW []byte, assetID uint32, cfg
c.walletMtx.Unlock()

if oldWallet.connected() {
// NOTE: Cannot lock the wallet backend because it may be the same as
// the one just connected.
go oldWallet.Disconnect()
}

Expand Down Expand Up @@ -1751,13 +1782,16 @@ func (c *Core) setWalletPassword(wallet *xcWallet, newPW []byte, crypter encrypt
return codedError(dbErr, err)
}

if !wasConnected {
wallet.Disconnect()
} else if !wasUnlocked {
// Re-lock the wallet if it was previously locked.
if !wasUnlocked {
if err = wallet.Lock(); err != nil {
c.log.Warnf("Unable to relock %s wallet: %v", unbip(wallet.AssetID), err)
}
}
// Disconnect if it was not previously connected.
if !wasConnected {
wallet.Disconnect()
}

details := fmt.Sprintf("Password for %s wallet has been updated.",
unbip(wallet.AssetID))
Expand Down Expand Up @@ -4534,72 +4568,41 @@ func (c *Core) tipChange(assetID uint32, nodeErr error) {
c.updateBalances(assets)
}

// PromptShutdown asks confirmation to shutdown the dexc when has active orders.
// If the user answers in the affirmative, the wallets are locked and true is
// returned. The provided channel is used to allow an OS signal to break the
// prompt and force the shutdown with out answering the prompt in the
// affirmative.
// PromptShutdown checks if there are active orders and asks confirmation to
// shutdown if there are. The return value indicates if it is safe to stop Core
// or if the user has confirmed they want to shutdown with active orders.
func (c *Core) PromptShutdown() bool {
conns := c.dexConnections()

lockWallets := func() {
// Lock wallets
for _, w := range c.xcWallets() {
if w.connected() {
if err := w.Lock(); err != nil {
c.log.Errorf("error locking wallet: %v", err)
}
}
}
// If all wallets locked, lock each dex account.
for _, dc := range conns {
dc.acct.lock()
}
}

ok := true
var haveActiveOrders bool
for _, dc := range conns {
if dc.hasActiveOrders() {
ok = false
haveActiveOrders = true
break
}
}

if !ok {
fmt.Print("You have active orders. Shutting down now may result in failed swaps and account penalization.\n" +
"Do you want to quit anyway? ('y' to quit, 'n' or enter to abort shutdown):")
scanner := bufio.NewScanner(os.Stdin)
scanner.Scan()
if err := scanner.Err(); err != nil {
fmt.Printf("Input error: %v", err)
return false
}

switch strings.ToLower(scanner.Text()) {
case "y", "yes":
ok = true
case "n", "no":
fallthrough
default:
fmt.Println("Shutdown aborted.")
}
if !haveActiveOrders {
return true
}

if ok {
lockWallets()
fmt.Print("You have active orders. Shutting down now may result in failed swaps and account penalization.\n" +
"Do you want to quit anyway? ('yes' to quit, or enter to abort shutdown): ")
scanner := bufio.NewScanner(os.Stdin)
scanner.Scan() // waiting for user input
if err := scanner.Err(); err != nil {
fmt.Printf("Input error: %v", err)
return false
}

return ok
}

// ForceShutdown shuts down the client, ignoring any active trades.
func (c *Core) ForceShutdown() {
for _, wallet := range c.xcWallets() {
wallet.Lock()
}
for _, dc := range c.dexConnections() {
dc.acct.lock()
switch resp := strings.ToLower(scanner.Text()); resp {
case "y", "yes":
return true
case "n", "no", "":
default: // anything else aborts, but warn about it
fmt.Printf("Unrecognized response %q. ", resp)
}
fmt.Println("Shutdown aborted.")
return false
}

// convertAssetInfo converts from a *msgjson.Asset to the nearly identical
Expand Down
32 changes: 19 additions & 13 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,6 @@ type TXCWallet struct {
locked bool
changeCoin *tCoin
syncStatus func() (bool, float32, error)
connectWG *sync.WaitGroup
confsMtx sync.RWMutex
confs map[string]uint32
confsErr map[string]error
Expand All @@ -571,7 +570,6 @@ func newTWallet(assetID uint32) (*xcWallet, *TXCWallet) {
w := &TXCWallet{
changeCoin: &tCoin{id: encode.RandomBytes(36)},
syncStatus: func() (synced bool, progress float32, err error) { return true, 1, nil },
connectWG: &sync.WaitGroup{},
confs: make(map[string]uint32),
confsErr: make(map[string]error),
}
Expand All @@ -597,11 +595,15 @@ func (w *TXCWallet) OwnsAddress(address string) (bool, error) {
}

func (w *TXCWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) {
return w.connectWG, w.connectErr
var wg sync.WaitGroup
wg.Add(1)
go func() {
<-ctx.Done()
wg.Done()
}()
return &wg, w.connectErr
}

func (w *TXCWallet) Run(ctx context.Context) { <-ctx.Done() }

func (w *TXCWallet) Balance() (*asset.Balance, error) {
if w.balErr != nil {
return nil, w.balErr
Expand Down Expand Up @@ -4915,7 +4917,10 @@ func TestReconfigureWallet(t *testing.T) {
return xyzWallet.Wallet, nil
},
})
xyzWallet.Connect(tCtx)
if err = xyzWallet.Connect(); err != nil {
t.Fatal(err)
}
defer xyzWallet.Disconnect()

// Connect error
tXyzWallet.connectErr = tErr
Expand Down Expand Up @@ -5905,11 +5910,10 @@ func TestWalletSyncing(t *testing.T) {

noteFeed := tCore.NotificationFeed()
dcrWallet, tDcrWallet := newTWallet(tDCR.ID)
tDcrWallet.connectWG.Add(1)
defer tDcrWallet.connectWG.Done()
dcrWallet.synced = false
dcrWallet.syncProgress = 0
dcrWallet.Connect(tCtx)
_ = dcrWallet.Connect()
defer dcrWallet.Disconnect()

tStart := time.Now()
testDuration := 100 * time.Millisecond
Expand All @@ -5928,7 +5932,8 @@ func TestWalletSyncing(t *testing.T) {
t.Fatalf("connectWallet error: %v", err)
}

timeout := time.NewTimer(time.Second).C
timeout := time.NewTimer(time.Second)
defer timeout.Stop()
var progressNotes int
out:
for {
Expand All @@ -5942,15 +5947,16 @@ out:
break out
}
progressNotes++
case <-timeout:
case <-timeout.C:
t.Fatalf("timed out waiting for synced wallet note. Received %d progress notes", progressNotes)
}
}
// Should get 9 notes, but just make sure we get at least half of them to
// avoid github vm false positives.
if progressNotes < 5 {
t.Fatalf("expected 23 progress notes, got %d", progressNotes)
if progressNotes < 5 /* 9? */ {
t.Fatalf("expected 9 progress notes, got %d", progressNotes)
}
// t.Logf("got %d progress notes", progressNotes)
}

func TestParseCert(t *testing.T) {
Expand Down
14 changes: 11 additions & 3 deletions client/core/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,20 @@ func (w *xcWallet) connected() bool {
}

// Connect calls the dex.Connector's Connect method, sets the xcWallet.hookedUp
// flag to true, and validates the deposit address.
func (w *xcWallet) Connect(ctx context.Context) error {
err := w.connector.Connect(ctx)
// flag to true, and validates the deposit address. Use Disconnect to cleanly
// shutdown the wallet.
func (w *xcWallet) Connect() error {
// No parent context; use Disconnect instead.
err := w.connector.Connect(context.Background())
if err != nil {
return err
}
// Now that we are connected, we must Disconnect if any calls fail below
// since we are considering this wallet not "hookedUp".

synced, progress, err := w.SyncStatus()
if err != nil {
w.connector.Disconnect()
return err
}

Expand All @@ -189,12 +195,14 @@ func (w *xcWallet) Connect(ctx context.Context) error {
if haveAddress {
haveAddress, err = w.OwnsAddress(w.address)
if err != nil {
w.connector.Disconnect()
return err
}
}
if !haveAddress {
w.address, err = w.Address()
if err != nil {
w.connector.Disconnect()
return fmt.Errorf("%s Wallet.Address error: %w", unbip(w.AssetID), err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion dex/testing/loadbot/mantle.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ out:
}
}

m.ForceShutdown()
// Let Core shutdown and lock up.
m.waiter.WaitForShutdown()
}

// A Mantle is a wrapper for *core.Core that adds some useful LoadBot methods
Expand Down

0 comments on commit 8976d8d

Please sign in to comment.