Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle expiration and auth errors on reconnects. #499

Merged
merged 2 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module github.com/nats-io/nats.go

require (
github.com/nats-io/jwt v0.2.12
github.com/nats-io/nkeys v0.1.0
github.com/nats-io/nuid v1.0.1
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
github.com/nats-io/jwt v0.2.12 h1:Y3YLoJey+Q/yMk/1Ig3xhWxYXE7vNSefozkArIcnSlU=
github.com/nats-io/jwt v0.2.12/go.mod h1:mQxQ0uHQ9FhEVPIcTSKwx2lqZEpXWWcCgA7R6NrWvvY=
github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4=
github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down
94 changes: 32 additions & 62 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ import (
"math/rand"
"net"
"net/url"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/nats-io/jwt"
"github.com/nats-io/nats.go/util"
"github.com/nats-io/nkeys"
"github.com/nats-io/nuid"
Expand Down Expand Up @@ -67,6 +67,9 @@ const (

// AUTHORIZATION_ERR is for when nats server user authorization has failed.
AUTHORIZATION_ERR = "authorization violation"

// AUTHENTICATION_EXPIRED_ERR is for when nats server user authorization has expired.
AUTHENTICATION_EXPIRED_ERR = "user authentication expired"
)

// Errors
Expand All @@ -84,6 +87,7 @@ var (
ErrTimeout = errors.New("nats: timeout")
ErrBadTimeout = errors.New("nats: timeout invalid")
ErrAuthorization = errors.New("nats: authorization violation")
ErrAuthExpired = errors.New("nats: authentication expired")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse error")
ErrChanArg = errors.New("nats: argument needs to be a channel type")
Expand Down Expand Up @@ -477,6 +481,7 @@ type srv struct {
didConnect bool
reconnects int
lastAttempt time.Time
lastErr error
isImplicit bool
tlsName string
}
Expand Down Expand Up @@ -2244,10 +2249,30 @@ func (nc *Conn) processPermissionsViolation(err string) {
// processAuthorizationViolation is called when the server signals a user
// authorization violation.
func (nc *Conn) processAuthorizationViolation(err string) {
nc.processAuthError(ErrAuthorization)
}

// processAuthenticationExpired is called when the server signals a user
// authorization has expired.
func (nc *Conn) processAuthenticationExpired(err string) {
nc.processAuthError(ErrAuthExpired)
}

// processAuthError generally processing for auth errors. We want to do retries
// unless we get the same error again. This allows us for instance to swap credentials
// and have the app reconnect, but if nothing is changing we should bail.
func (nc *Conn) processAuthError(err error) {
nc.mu.Lock()
nc.err = ErrAuthorization
nc.err = err
if nc.Opts.AsyncErrorCB != nil {
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, ErrAuthorization) })
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
}
// We should give up if we tried twice on this server and got the
// same error.
if nc.current.lastErr == err {
defer nc.Close()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @kozlovic should we be shutting down here or just removing this one from the list? If there are more servers in the list we probably should not give up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am connected to a cluster, and one server is bad but others are ok, we should be able to reconnect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant yes, why not do that. But I can't really recall why we did this in the first place. What problem exactly were we trying to solve? Is there a situation where a server will return auth error always and not other servers in the cluster? (without being a misconfig). If auth is updated in a server and a client connecting to the server while auth is returning error, we possibly can remove it from the list of urls while the server is then being updated and would have accepted that connection later on.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I revoke your credentials in some form or fashion I wanted the default behavior to not keep trying forever.

} else {
nc.current.lastErr = err
}
nc.mu.Unlock()
}
Expand Down Expand Up @@ -2434,6 +2459,8 @@ func (nc *Conn) processErr(ie string) {
nc.processPermissionsViolation(ne)
} else if strings.HasPrefix(e, AUTHORIZATION_ERR) {
nc.processAuthorizationViolation(ne)
} else if strings.HasPrefix(e, AUTHENTICATION_EXPIRED_ERR) {
nc.processAuthenticationExpired(ne)
} else {
nc.mu.Lock()
nc.err = errors.New("nats: " + ne)
Expand Down Expand Up @@ -3920,70 +3947,20 @@ func NkeyOptionFromSeed(seedFile string) (Option, error) {
return Nkey(string(pub), sigCB), nil
}

// This is a regex to match decorated jwts in keys/seeds.
// .e.g.
// -----BEGIN NATS USER JWT-----
// eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
// ------END NATS USER JWT------
//
// ************************* IMPORTANT *************************
// NKEY Seed printed below can be used sign and prove identity.
// NKEYs are sensitive and should be treated as secrets.
//
// -----BEGIN USER NKEY SEED-----
// SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
// ------END USER NKEY SEED------

var nscDecoratedRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-]{3,}[^\n]*[-]{3,}\n))`)

func userFromFile(userFile string) (string, error) {
contents, err := ioutil.ReadFile(userFile)
if err != nil {
return _EMPTY_, fmt.Errorf("nats: %v", err)
}
defer wipeSlice(contents)

items := nscDecoratedRe.FindAllSubmatch(contents, -1)
if len(items) == 0 {
return string(contents), nil
}
// First result should be the user JWT.
// We copy here so that if the file contained a seed file too we wipe appropriately.
raw := items[0][1]
tmp := make([]byte, len(raw))
copy(tmp, raw)
return string(tmp), nil
return jwt.ParseDecoratedJWT(contents)
}

func nkeyPairFromSeedFile(seedFile string) (nkeys.KeyPair, error) {
var seed []byte
contents, err := ioutil.ReadFile(seedFile)
if err != nil {
return nil, fmt.Errorf("nats: %v", err)
}
defer wipeSlice(contents)

items := nscDecoratedRe.FindAllSubmatch(contents, -1)
if len(items) > 1 {
seed = items[1][1]
} else {
lines := bytes.Split(contents, []byte("\n"))
for _, line := range lines {
if bytes.HasPrefix(bytes.TrimSpace(line), []byte("SU")) {
seed = line
break
}
}
}

if seed == nil {
return nil, fmt.Errorf("nats: No nkey user seed found in %q", seedFile)
}
kp, err := nkeys.FromSeed(seed)
if err != nil {
return nil, err
}
return kp, nil
return jwt.ParseDecoratedNKey(contents)
}

// Sign authentication challenges from the server.
Expand All @@ -4000,13 +3977,6 @@ func sigHandler(nonce []byte, seedFile string) ([]byte, error) {
return sig, nil
}

// Just wipe slice with 'x', for clearing contents of nkey seed file.
func wipeSlice(buf []byte) {
for i := range buf {
buf[i] = 'x'
}
}

type timeoutWriter struct {
timeout time.Duration
conn net.Conn
Expand Down
116 changes: 116 additions & 0 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"testing"
"time"

"github.com/nats-io/jwt"
"github.com/nats-io/nats-server/v2/server"
natsserver "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nkeys"
Expand Down Expand Up @@ -1403,6 +1404,121 @@ func TestUserCredentialsChainedFile(t *testing.T) {
nc.Close()
}

func createNewUserKeys() (string, []byte) {
kp, _ := nkeys.CreateUser()
pub, _ := kp.PublicKey()
priv, _ := kp.Seed()
return pub, priv
}

func TestExpiredUserCredentials(t *testing.T) {
if server.VERSION[0] == '1' {
t.Skip()
}
ts := runTrustServer()
defer ts.Shutdown()

// Create user credentials that will expire in a short timeframe.
pub, priv := createNewUserKeys()
nuc := jwt.NewUserClaims(pub)
nuc.Expires = time.Now().Add(time.Second).Unix()
akp, _ := nkeys.FromSeed(aSeed)
ujwt, err := nuc.Encode(akp)
if err != nil {
t.Fatalf("Error encoding user jwt: %v", err)
}
creds, err := jwt.FormatUserConfig(ujwt, priv)
if err != nil {
t.Fatalf("Error encoding credentials: %v", err)
}
chainedFile := createTmpFile(t, creds)
defer os.Remove(chainedFile)

ch := make(chan bool)

url := fmt.Sprintf("nats://127.0.0.1:%d", TEST_PORT)
nc, err := Connect(url,
UserCredentials(chainedFile),
ReconnectWait(25*time.Millisecond),
MaxReconnects(-1),
ClosedHandler(func(nc *Conn) {
ch <- true
}),
)
if err != nil {
t.Fatalf("Expected to connect, got %v", err)
}
defer nc.Close()

// We should give up since we get the same error on both tries.
if err := WaitTime(ch, 2*time.Second); err != nil {
t.Fatal("Should have closed after multiple failed attempts.")
}
}

func TestExpiredUserCredentialsRenewal(t *testing.T) {
if server.VERSION[0] == '1' {
t.Skip()
}
ts := runTrustServer()
defer ts.Shutdown()

// Create user credentials that will expire in a short timeframe.
pub, priv := createNewUserKeys()
nuc := jwt.NewUserClaims(pub)
nuc.Expires = time.Now().Add(time.Second).Unix()
akp, _ := nkeys.FromSeed(aSeed)
ujwt, err := nuc.Encode(akp)
if err != nil {
t.Fatalf("Error encoding user jwt: %v", err)
}
creds, err := jwt.FormatUserConfig(ujwt, priv)
if err != nil {
t.Fatalf("Error encoding credentials: %v", err)
}
chainedFile := createTmpFile(t, creds)
defer os.Remove(chainedFile)

rch := make(chan bool)

url := fmt.Sprintf("nats://127.0.0.1:%d", TEST_PORT)
nc, err := Connect(url,
UserCredentials(chainedFile),
ReconnectWait(25*time.Millisecond),
MaxReconnects(2),
ReconnectHandler(func(nc *Conn) {
rch <- true
}),
)
if err != nil {
t.Fatalf("Expected to connect, got %v", err)
}
defer nc.Close()

// Place new credentials underneath.
nuc.Expires = time.Now().Add(30 * time.Second).Unix()
ujwt, err = nuc.Encode(akp)
if err != nil {
t.Fatalf("Error encoding user jwt: %v", err)
}
creds, err = jwt.FormatUserConfig(ujwt, priv)
if err != nil {
t.Fatalf("Error encoding credentials: %v", err)
}
if err := ioutil.WriteFile(chainedFile, creds, 0666); err != nil {
t.Fatalf("Error writing conf file: %v", err)
}

// Make sure we get disconnected and reconnected first.
if err := WaitTime(rch, 2*time.Second); err != nil {
t.Fatal("Should have reconnected.")
}

if nc.IsClosed() {
t.Fatal("Got disconnected when we should have reconnected.")
}
}

// If we are using TLS and have multiple servers we try to match the IP
// from a discovered server with the expected hostname for certs without IP
// designations. In certain cases where there is a not authorized error and
Expand Down