Skip to content

Commit

Permalink
add getStats api (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuthus5 authored May 26, 2023
1 parent e3c9f0d commit 7fc264c
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 7 deletions.
2 changes: 1 addition & 1 deletion padmin/non_persistent_topics_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (n *NonPersistentTopicsImpl) GetStats(tenant string, namespace string, topi
panic("implement me")
}

func (n *NonPersistentTopicsImpl) GetPartitionedStats(tenant string, namespace string, topic string) ([]*TopicStatistics, error) {
func (n *NonPersistentTopicsImpl) GetPartitionedStats(tenant string, namespace string, topic string) (*TopicStatistics, error) {
//TODO implement me
panic("implement me")
}
Expand Down
27 changes: 22 additions & 5 deletions padmin/persistent_topics_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,30 @@ type PersistentTopicsImpl struct {
}

func (p *PersistentTopicsImpl) GetStats(tenant, namespace, topic string) (*TopicStatistics, error) {
//TODO implement me
panic("implement me")
resp, err := p.cli.Get(fmt.Sprintf(UrlPersistentGetStatsForTopicFormat, tenant, namespace, topic))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var body = new(TopicStatistics)
if err := EasyReader(resp, body); err != nil {
return nil, err
}
return body, nil
}

func (p *PersistentTopicsImpl) GetPartitionedStats(tenant, namespace, topic string) ([]*TopicStatistics, error) {
//TODO implement me
panic("implement me")
func (p *PersistentTopicsImpl) GetPartitionedStats(tenant, namespace, topic string) (*TopicStatistics, error) {
resp, err := p.cli.Get(fmt.Sprintf(UrlPersistentGetStatsForPartitionedTopicFormat, tenant, namespace, topic))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var body = new(TopicStatistics)
if err := EasyReader(resp, body); err != nil {
return nil, err
}

return body, nil
}

func (p *PersistentTopicsImpl) GetStatsInternal(tenant, namespace, topic string) (*TopicInternalStats, error) {
Expand Down
62 changes: 62 additions & 0 deletions padmin/persistent_topics_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,65 @@ func TestPersistentTopicsImpl_GetStatsInternalForTopic(t *testing.T) {
require.Nil(t, err)
t.Logf("get stats: %+v", stats)
}

func TestPersistentTopicsImpl_GetStats(t *testing.T) {
broker := startTestBroker(t)
defer broker.Close()
admin := NewTestPulsarAdmin(t, broker.webPort)
testTenant := RandStr(8)
err := admin.Tenants.Create(testTenant, TenantInfo{
AllowedClusters: []string{"standalone"},
})
require.Nil(t, err)
testNs := RandStr(8)
err = admin.Namespaces.Create(testTenant, testNs)
require.Nil(t, err)
namespaces, err := admin.Namespaces.List(testTenant)
require.Nil(t, err)
assert.Contains(t, namespaces, fmt.Sprintf("%s/%s", testTenant, testNs))
testTopic := RandStr(8)
err = admin.PersistentTopics.CreateNonPartitioned(testTenant, testNs, testTopic)
require.Nil(t, err)
topicList, err := admin.PersistentTopics.ListNonPartitioned(testTenant, testNs)
require.Nil(t, err)
if len(topicList) != 1 {
t.Fatal("topic list should have one topic")
}
if topicList[0] != fmt.Sprintf("persistent://%s/%s/%s", testTenant, testNs, testTopic) {
t.Fatal("topic name should be equal")
}
stats, err := admin.PersistentTopics.GetStats(testTenant, testNs, testTopic)
require.Nil(t, err)
t.Logf("get stats: %+v", stats)
}

func TestPersistentTopicsImpl_GetPartitionedStats(t *testing.T) {
broker := startTestBroker(t)
defer broker.Close()
admin := NewTestPulsarAdmin(t, broker.webPort)
testTenant := RandStr(8)
err := admin.Tenants.Create(testTenant, TenantInfo{
AllowedClusters: []string{"standalone"},
})
require.Nil(t, err)
testNs := RandStr(8)
err = admin.Namespaces.Create(testTenant, testNs)
require.Nil(t, err)
namespaces, err := admin.Namespaces.List(testTenant)
require.Nil(t, err)
assert.Contains(t, namespaces, fmt.Sprintf("%s/%s", testTenant, testNs))
testTopic := RandStr(8)
err = admin.PersistentTopics.CreatePartitioned(testTenant, testNs, testTopic, 2)
require.Nil(t, err)
topicList, err := admin.PersistentTopics.ListPartitioned(testTenant, testNs)
require.Nil(t, err)
if len(topicList) != 1 {
t.Fatal("topic list should have one topic")
}
if topicList[0] != fmt.Sprintf("persistent://%s/%s/%s", testTenant, testNs, testTopic) {
t.Fatal("topic name should be equal")
}
statsList, err := admin.PersistentTopics.GetPartitionedStats(testTenant, testNs, testTopic)
require.Nil(t, err)
t.Logf("get partitioned stats:%+v", statsList)
}
40 changes: 39 additions & 1 deletion padmin/stats_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package padmin

type TopicStats interface {
GetStats(string, string, string) (*TopicStatistics, error)
GetPartitionedStats(string, string, string) ([]*TopicStatistics, error)
GetPartitionedStats(string, string, string) (*TopicStatistics, error)
GetStatsInternal(string, string, string) (*TopicInternalStats, error)
GetPartitionedStatsInternal(string, string, string) (*PartitionedTopicInternalStats, error)
}
Expand Down Expand Up @@ -62,6 +62,37 @@ type PartitionedTopicInternalStats struct {
}

type TopicStatistics struct {
MsgRateIn float64 `json:"msgRateIn,omitempty"`
MsgThroughputIn float64 `json:"msgThroughputIn,omitempty"`
MsgRateOut float64 `json:"msgRateOut,omitempty"`
MsgThroughputOut float64 `json:"msgThroughputOut,omitempty"`
BytesInCounter uint64 `json:"bytesInCounter,omitempty"`
MsgInCounter uint64 `json:"msgInCounter,omitempty"`
BytesOutCounter uint64 `json:"bytesOutCounter,omitempty"`
MsgOutCounter uint64 `json:"msgOutCounter,omitempty"`
AverageMsgSize float64 `json:"averageMsgSize,omitempty"`
MsgChunkPublished bool `json:"msgChunkPublished,omitempty"`
StorageSize uint64 `json:"storageSize,omitempty"`
BacklogSize uint64 `json:"backlogSize,omitempty"`
PublishRateLimitedTimes uint64 `json:"publishRateLimitedTimes,omitempty"`
EarliestMsgPublishTimeInBacklogs uint64 `json:"earliestMsgPublishTimeInBacklogs,omitempty"`
OffloadedStorageSize uint64 `json:"offloadedStorageSize,omitempty"`
LastOffloadLedgerId uint64 `json:"lastOffloadLedgerId,omitempty"`
LastOffloadSuccessTimeStamp uint64 `json:"lastOffloadSuccessTimeStamp,omitempty"`
LastOffloadFailureTimeStamp uint64 `json:"lastOffloadFailureTimeStamp,omitempty"`
OngoingTxnCount uint64 `json:"ongoingTxnCount,omitempty"`
AbortedTxnCount uint64 `json:"abortedTxnCount,omitempty"`
CommittedTxnCount uint64 `json:"committedTxnCount,omitempty"`
Publishers []string `json:"publishers,omitempty"`
WaitingPublishers uint64 `json:"waitingPublishers,omitempty"`
Subscriptions map[string]string `json:"subscriptions,omitempty"`
Replication map[string]string `json:"replication,omitempty"`
DeduplicationStatus string `json:"deduplicationStatus,omitempty"`
NonContiguousDeletedMessagesRanges uint64 `json:"nonContiguousDeletedMessagesRanges,omitempty"`
NonContiguousDeletedMessagesRangesSerializedSize uint64 `json:"nonContiguousDeletedMessagesRangesSerializedSize,omitempty"`
DelayedMessageIndexSizeInBytes uint64 `json:"delayedMessageIndexSizeInBytes,omitempty"`
Compaction Compaction `json:"compaction"`
OwnerBroker string `json:"ownerBroker,omitempty"`
}

type CursorStats struct {
Expand All @@ -82,3 +113,10 @@ type CursorStats struct {
SubscriptionHavePendingReplayRead bool `json:"subscriptionHavePendingReplayRead"`
Properties map[string]int64 `json:"properties"`
}

type Compaction struct {
LastCompactionRemovedEventCount uint64 `json:"lastCompactionRemovedEventCount,omitempty"`
LastCompactionSucceedTimestamp uint64 `json:"lastCompactionSucceedTimestamp,omitempty"`
LastCompactionFailedTimestamp uint64 `json:"lastCompactionFailedTimestamp,omitempty"`
LastCompactionDurationTimeInMills uint64 `json:"lastCompactionDurationTimeInMills,omitempty"`
}

0 comments on commit 7fc264c

Please sign in to comment.