-
Notifications
You must be signed in to change notification settings - Fork 712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ADDED] Ability to retrieve client ID from server currently connected to #395
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,6 +93,7 @@ var ( | |
ErrInvalidArg = errors.New("nats: invalid argument") | ||
ErrInvalidContext = errors.New("nats: invalid context") | ||
ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server") | ||
ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server") | ||
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) | ||
) | ||
|
||
|
@@ -428,6 +429,7 @@ type serverInfo struct { | |
MaxPayload int64 `json:"max_payload"` | ||
ConnectURLs []string `json:"connect_urls,omitempty"` | ||
Proto int `json:"proto,omitempty"` | ||
CID uint64 `json:"client_id,omitempty"` | ||
} | ||
|
||
const ( | ||
|
@@ -3476,3 +3478,20 @@ func (nc *Conn) Barrier(f func()) error { | |
nc.mu.Unlock() | ||
return nil | ||
} | ||
|
||
// GetClientID returns the client ID assigned by the server to which | ||
// the client is currently connected to. Note that the value may change if | ||
// the client reconnects. | ||
// This function returns ErrNoClientIDReturned if the server is of a | ||
// version prior to 1.2.0. | ||
func (nc *Conn) GetClientID() (uint64, error) { | ||
nc.mu.Lock() | ||
defer nc.mu.Unlock() | ||
if nc.isClosed() { | ||
return 0, ErrConnectionClosed | ||
} | ||
if nc.info.CID == 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any better way to make sure this was not included in the INFO json? I doubt we would wrap an uint64 but if we did I think it would show up as a zero. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then the server should make sure that if it wraps, then it start again to 1. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe post an issue against the server. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, is it really worth it? I mean at 10M new connections a second, it would take 58,494 years without restarting the server before it wraps (if my math is correct). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, so will change the error name/content and update |
||
return 0, ErrClientIDNotSupported | ||
} | ||
return nc.info.CID, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2051,3 +2051,140 @@ func TestNilOpts(t *testing.T) { | |
t.Fatal("Unexpected error: option not set.") | ||
} | ||
} | ||
|
||
func TestGetClientID(t *testing.T) { | ||
if serverVersionAtLeast(1, 2, 0) != nil { | ||
t.SkipNow() | ||
} | ||
optsA := test.DefaultTestOptions | ||
optsA.Port = -1 | ||
optsA.Cluster.Port = -1 | ||
srvA := RunServerWithOptions(optsA) | ||
defer srvA.Shutdown() | ||
|
||
ch := make(chan bool, 1) | ||
nc1, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srvA.Addr().(*net.TCPAddr).Port), | ||
nats.DiscoveredServersHandler(func(_ *nats.Conn) { | ||
ch <- true | ||
}), | ||
nats.ReconnectHandler(func(_ *nats.Conn) { | ||
ch <- true | ||
})) | ||
if err != nil { | ||
t.Fatalf("Error on connect: %v", err) | ||
} | ||
defer nc1.Close() | ||
|
||
cid, err := nc1.GetClientID() | ||
if err != nil { | ||
t.Fatalf("Error getting CID: %v", err) | ||
} | ||
if cid == 0 { | ||
t.Fatal("Unexpected cid value, make sure server is 1.2.0+") | ||
} | ||
|
||
// Start a second server and verify that async INFO contains client ID | ||
optsB := test.DefaultTestOptions | ||
optsB.Port = -1 | ||
optsB.Cluster.Port = -1 | ||
optsB.Routes = server.RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", srvA.ClusterAddr().Port)) | ||
srvB := RunServerWithOptions(optsB) | ||
defer srvB.Shutdown() | ||
|
||
// Wait for the discovered callback to fire | ||
if err := Wait(ch); err != nil { | ||
t.Fatal("Did not the discovered callback") | ||
} | ||
// Now check CID should be valid and same as before | ||
newCID, err := nc1.GetClientID() | ||
if err != nil { | ||
t.Fatalf("Error getting CID: %v", err) | ||
} | ||
if newCID != cid { | ||
t.Fatalf("Expected CID to be %v, got %v", cid, newCID) | ||
} | ||
|
||
// Create a client to server B | ||
nc2, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", srvB.Addr().(*net.TCPAddr).Port)) | ||
if err != nil { | ||
t.Fatalf("Error on connect: %v", err) | ||
} | ||
defer nc2.Close() | ||
|
||
// Stop server A, nc1 will reconnect to B, and should have different CID | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could be the same since CID is scoped to the server. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No since I made sure that there was another client connected to server B prior to killing A and having nc1 reconnect to server B. |
||
srvA.Shutdown() | ||
// Wait for nc1 to reconnect | ||
if err := Wait(ch); err != nil { | ||
t.Fatal("Did not reconnect") | ||
} | ||
newCID, err = nc1.GetClientID() | ||
if err != nil { | ||
t.Fatalf("Error getting CID: %v", err) | ||
} | ||
if newCID == 0 { | ||
t.Fatal("Unexpected cid value, make sure server is 1.2.0+") | ||
} | ||
if newCID == cid { | ||
t.Fatalf("Expected different CID since server already had a client") | ||
} | ||
nc1.Close() | ||
newCID, err = nc1.GetClientID() | ||
if err == nil { | ||
t.Fatalf("Expected error, got none") | ||
} | ||
if newCID != 0 { | ||
t.Fatalf("Expected 0 on connection closed, got %v", newCID) | ||
} | ||
|
||
// Stop clients and remaining server | ||
nc1.Close() | ||
nc2.Close() | ||
srvB.Shutdown() | ||
|
||
// Now have dummy server that returns no CID and check we get expected error. | ||
l, e := net.Listen("tcp", "127.0.0.1:0") | ||
if e != nil { | ||
t.Fatal("Could not listen on an ephemeral port") | ||
} | ||
tl := l.(*net.TCPListener) | ||
defer tl.Close() | ||
|
||
addr := tl.Addr().(*net.TCPAddr) | ||
|
||
wg := sync.WaitGroup{} | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
conn, err := l.Accept() | ||
if err != nil { | ||
t.Fatalf("Error accepting client connection: %v\n", err) | ||
} | ||
defer conn.Close() | ||
info := fmt.Sprintf("INFO {\"server_id\":\"foobar\",\"host\":\"%s\",\"port\":%d,\"auth_required\":false,\"tls_required\":false,\"max_payload\":1048576}\r\n", addr.IP, addr.Port) | ||
conn.Write([]byte(info)) | ||
|
||
// Read connect and ping commands sent from the client | ||
line := make([]byte, 256) | ||
_, err = conn.Read(line) | ||
if err != nil { | ||
t.Fatalf("Expected CONNECT and PING from client, got: %s", err) | ||
} | ||
conn.Write([]byte("PONG\r\n")) | ||
// Now wait to be notified that we can finish | ||
<-ch | ||
}() | ||
|
||
nc, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", addr.Port)) | ||
if err != nil { | ||
t.Fatalf("Error on connect: %v", err) | ||
} | ||
defer nc.Close() | ||
|
||
if cid, err := nc.GetClientID(); err != nats.ErrClientIDNotSupported || cid != 0 { | ||
t.Fatalf("Expected err=%v and cid=0, got err=%v and cid=%v", nats.ErrClientIDNotSupported, err, cid) | ||
} | ||
// Release fake server | ||
nc.Close() | ||
ch <- true | ||
wg.Wait() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If 0 return an error that it was not provided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done