Skip to content

Commit

Permalink
Merge pull request #881 from sparklxb/master
Browse files Browse the repository at this point in the history
nsqd: add producer client connections to /stats
  • Loading branch information
mreiferson authored Aug 20, 2018
2 parents e319f8c + 996be6d commit e0e08a2
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 48 deletions.
25 changes: 25 additions & 0 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type clientV2 struct {
FinishCount uint64
RequeueCount uint64

pubCounts map[string]uint64

writeLock sync.RWMutex
metaLock sync.RWMutex

Expand Down Expand Up @@ -141,6 +143,8 @@ func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 {

// heartbeats are client configurable but default to 30s
HeartbeatInterval: ctx.nsqd.getOpts().ClientTimeout / 2,

pubCounts: make(map[string]uint64),
}
c.lenSlice = c.lenBuf[:]
return c
Expand Down Expand Up @@ -211,6 +215,13 @@ func (c *clientV2) Stats() ClientStats {
identity = c.AuthState.Identity
identityURL = c.AuthState.IdentityURL
}
pubCounts := make([]PubCount, 0, len(c.pubCounts))
for topic, count := range c.pubCounts {
pubCounts = append(pubCounts, PubCount{
Topic: topic,
Count: count,
})
}
c.metaLock.RUnlock()
stats := ClientStats{
Version: "V2",
Expand All @@ -232,6 +243,7 @@ func (c *clientV2) Stats() ClientStats {
Authed: c.HasAuthorizations(),
AuthIdentity: identity,
AuthIdentityURL: identityURL,
PubCounts: pubCounts,
}
if stats.TLS {
p := prettyConnectionState{c.tlsConn.ConnectionState()}
Expand All @@ -243,6 +255,13 @@ func (c *clientV2) Stats() ClientStats {
return stats
}

func (c *clientV2) IsProducer() bool {
c.metaLock.RLock()
retval := len(c.pubCounts) > 0
c.metaLock.RUnlock()
return retval
}

// struct to convert from integers to the human readable strings
type prettyConnectionState struct {
tls.ConnectionState
Expand Down Expand Up @@ -343,6 +362,12 @@ func (c *clientV2) SendingMessage() {
atomic.AddUint64(&c.MessageCount, 1)
}

func (c *clientV2) PublishedMessage(topic string, count uint64) {
c.metaLock.Lock()
c.pubCounts[topic] += count
c.metaLock.Unlock()
}

func (c *clientV2) TimedOutMessage() {
atomic.AddInt64(&c.InFlightCount, -1)
c.tryUpdateReadyState()
Expand Down
151 changes: 103 additions & 48 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,109 +481,136 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro
channelName, _ := reqParams.Get("channel")
jsonFormat := formatString == "json"

producerStats := s.ctx.nsqd.GetProducerStats()

stats := s.ctx.nsqd.GetStats(topicName, channelName)
health := s.ctx.nsqd.GetHealth()
startTime := s.ctx.nsqd.GetStartTime()
uptime := time.Since(startTime)

// If we WERE given a topic-name, remove stats for all the other topics:
// filter by topic (if specified)
if len(topicName) > 0 {
// Find the desired-topic-index:
for _, topicStats := range stats {
if topicStats.TopicName == topicName {
// If we WERE given a channel-name, remove stats for all the other channels:
// filter by channel (if specified)
if len(channelName) > 0 {
// Find the desired-channel:
for _, channelStats := range topicStats.Channels {
if channelStats.ChannelName == channelName {
topicStats.Channels = []ChannelStats{channelStats}
// We've got the channel we were looking for:
break
}
}
}

// We've got the topic we were looking for:
stats = []TopicStats{topicStats}
break
}
}

filteredProducerStats := make([]ClientStats, 0)
for _, clientStat := range producerStats {
var found bool
var count uint64
for _, v := range clientStat.PubCounts {
if v.Topic == topicName {
count = v.Count
found = true
break
}
}
if !found {
continue
}
clientStat.PubCounts = []PubCount{PubCount{
Topic: topicName,
Count: count,
}}
filteredProducerStats = append(filteredProducerStats, clientStat)
}
producerStats = filteredProducerStats
}

ms := getMemStats()
if !jsonFormat {
return s.printStats(stats, ms, health, startTime, uptime), nil
return s.printStats(stats, producerStats, ms, health, startTime, uptime), nil
}

return struct {
Version string `json:"version"`
Health string `json:"health"`
StartTime int64 `json:"start_time"`
Topics []TopicStats `json:"topics"`
Memory memStats `json:"memory"`
}{version.Binary, health, startTime.Unix(), stats, ms}, nil
Version string `json:"version"`
Health string `json:"health"`
StartTime int64 `json:"start_time"`
Topics []TopicStats `json:"topics"`
Memory memStats `json:"memory"`
Producers []ClientStats `json:"producers"`
}{version.Binary, health, startTime.Unix(), stats, ms, producerStats}, nil
}

func (s *httpServer) printStats(stats []TopicStats, ms memStats, health string, startTime time.Time, uptime time.Duration) []byte {
func (s *httpServer) printStats(stats []TopicStats, producerStats []ClientStats, ms memStats, health string, startTime time.Time, uptime time.Duration) []byte {
var buf bytes.Buffer
w := &buf

now := time.Now()
io.WriteString(w, fmt.Sprintf("%s\n", version.String("nsqd")))
io.WriteString(w, fmt.Sprintf("start_time %v\n", startTime.Format(time.RFC3339)))
io.WriteString(w, fmt.Sprintf("uptime %s\n", uptime))

fmt.Fprintf(w, "%s\n", version.String("nsqd"))
fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339))
fmt.Fprintf(w, "uptime %s\n", uptime)

fmt.Fprintf(w, "\nHealth: %s\n", health)

fmt.Fprintf(w, "\nMemory:\n")
fmt.Fprintf(w, " %-25s\t%d\n", "heap_objects", ms.HeapObjects)
fmt.Fprintf(w, " %-25s\t%d\n", "heap_idle_bytes", ms.HeapIdleBytes)
fmt.Fprintf(w, " %-25s\t%d\n", "heap_in_use_bytes", ms.HeapInUseBytes)
fmt.Fprintf(w, " %-25s\t%d\n", "heap_released_bytes", ms.HeapReleasedBytes)
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_100", ms.GCPauseUsec100)
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_99", ms.GCPauseUsec99)
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_95", ms.GCPauseUsec95)
fmt.Fprintf(w, " %-25s\t%d\n", "next_gc_bytes", ms.NextGCBytes)
fmt.Fprintf(w, " %-25s\t%d\n", "gc_total_runs", ms.GCTotalRuns)

if len(stats) == 0 {
io.WriteString(w, "\nNO_TOPICS\n")
return buf.Bytes()
fmt.Fprintf(w, "\nTopics: None\n")
} else {
fmt.Fprintf(w, "\nTopics:")
}
fmt.Fprintf(w, "\nMemory:\n")
fmt.Fprintf(w, " %-25s\t%d\n", "heap_objects", ms.HeapObjects)
fmt.Fprintf(w, " %-25s\t%d\n", "heap_idle_bytes", ms.HeapIdleBytes)
fmt.Fprintf(w, " %-25s\t%d\n", "heap_in_use_bytes", ms.HeapInUseBytes)
fmt.Fprintf(w, " %-25s\t%d\n", "heap_released_bytes", ms.HeapReleasedBytes)
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_100", ms.GCPauseUsec100)
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_99", ms.GCPauseUsec99)
fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_95", ms.GCPauseUsec95)
fmt.Fprintf(w, " %-25s\t%d\n", "next_gc_bytes", ms.NextGCBytes)
fmt.Fprintf(w, " %-25s\t%d\n", "gc_total_runs", ms.GCTotalRuns)

io.WriteString(w, fmt.Sprintf("\nHealth: %s\n", health))

for _, t := range stats {
var pausedPrefix string
if t.Paused {
pausedPrefix = "*P "
} else {
pausedPrefix = " "
}
io.WriteString(w, fmt.Sprintf("\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n",
fmt.Fprintf(w, "\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n",
pausedPrefix,
t.TopicName,
t.Depth,
t.BackendDepth,
t.MessageCount,
t.E2eProcessingLatency))
t.E2eProcessingLatency,
)
for _, c := range t.Channels {
if c.Paused {
pausedPrefix = " *P "
} else {
pausedPrefix = " "
}
io.WriteString(w,
fmt.Sprintf("%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n",
pausedPrefix,
c.ChannelName,
c.Depth,
c.BackendDepth,
c.InFlightCount,
c.DeferredCount,
c.RequeueCount,
c.TimeoutCount,
c.MessageCount,
c.E2eProcessingLatency))
fmt.Fprintf(w, "%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n",
pausedPrefix,
c.ChannelName,
c.Depth,
c.BackendDepth,
c.InFlightCount,
c.DeferredCount,
c.RequeueCount,
c.TimeoutCount,
c.MessageCount,
c.E2eProcessingLatency,
)
for _, client := range c.Clients {
connectTime := time.Unix(client.ConnectTime, 0)
// truncate to the second
duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second
io.WriteString(w, fmt.Sprintf(" [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n",
fmt.Fprintf(w, " [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n",
client.Version,
client.ClientID,
client.State,
Expand All @@ -593,10 +620,38 @@ func (s *httpServer) printStats(stats []TopicStats, ms memStats, health string,
client.RequeueCount,
client.MessageCount,
duration,
))
)
}
}
}

if len(producerStats) == 0 {
fmt.Fprintf(w, "\nProducers: None\n")
} else {
fmt.Fprintf(w, "\nProducers:")
for _, client := range producerStats {
connectTime := time.Unix(client.ConnectTime, 0)
// truncate to the second
duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second
var totalPubCount uint64
for _, v := range client.PubCounts {
totalPubCount += v.Count
}
fmt.Fprintf(w, "\n [%s %-21s] msgs: %-8d connected: %s\n",
client.Version,
client.ClientID,
totalPubCount,
duration,
)
for _, v := range client.PubCounts {
fmt.Fprintf(w, " [%-15s] msgs: %-8d\n",
v.Topic,
v.Count,
)
}
}
}

return buf.Bytes()
}

Expand Down
26 changes: 26 additions & 0 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type errStore struct {
err error
}

type Client interface {
Stats() ClientStats
IsProducer() bool
}

type NSQD struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
clientIDSequence int64
Expand All @@ -52,6 +57,9 @@ type NSQD struct {

topicMap map[string]*Topic

clientLock sync.RWMutex
clients map[int64]Client

lookupPeers atomic.Value

tcpListener net.Listener
Expand Down Expand Up @@ -82,6 +90,7 @@ func New(opts *Options) *NSQD {
n := &NSQD{
startTime: time.Now(),
topicMap: make(map[string]*Topic),
clients: make(map[int64]Client),
exitChan: make(chan int),
notifyChan: make(chan interface{}),
optsNotificationChan: make(chan struct{}, 1),
Expand Down Expand Up @@ -213,6 +222,23 @@ func (n *NSQD) GetStartTime() time.Time {
return n.startTime
}

func (n *NSQD) AddClient(clientID int64, client Client) {
n.clientLock.Lock()
n.clients[clientID] = client
n.clientLock.Unlock()
}

func (n *NSQD) RemoveClient(clientID int64) {
n.clientLock.Lock()
_, ok := n.clients[clientID]
if !ok {
n.clientLock.Unlock()
return
}
delete(n.clients, clientID)
n.clientLock.Unlock()
}

func (n *NSQD) Main() {
var err error
ctx := &context{n}
Expand Down
8 changes: 8 additions & 0 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error {

clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.ctx)
p.ctx.nsqd.AddClient(client.ID, client)

// synchronize the startup of messagePump in order
// to guarantee that it gets a chance to initialize
Expand Down Expand Up @@ -117,6 +118,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error {
client.Channel.RemoveClient(client.ID)
}

p.ctx.nsqd.RemoveClient(client.ID)
return err
}

Expand Down Expand Up @@ -799,6 +801,8 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
}

client.PublishedMessage(topicName, 1)

return okBytes, nil
}

Expand Down Expand Up @@ -850,6 +854,8 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) {
return nil, protocol.NewFatalClientErr(err, "E_MPUB_FAILED", "MPUB failed "+err.Error())
}

client.PublishedMessage(topicName, uint64(len(messages)))

return okBytes, nil
}

Expand Down Expand Up @@ -912,6 +918,8 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error())
}

client.PublishedMessage(topicName, 1)

return okBytes, nil
}

Expand Down
Loading

0 comments on commit e0e08a2

Please sign in to comment.